| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.ha; |
| |
| import static org.apache.geode.cache.Region.Entry; |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.cache.CacheServerImpl.generateNameForClientMsgsRegion; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.GEMFIRE_PREFIX; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.GemFireException; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.DiskStoreFactory; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache30.ClientServerTestCase; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.internal.AvailablePort; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.InternalCacheServer; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage; |
| import org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTestHelper; |
| import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| import org.apache.geode.test.dunit.rules.DistributedRule; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * This DUnit contains various tests to ensure new implementation of ha region queues works as |
| * expected. |
| * |
| * @since GemFire 5.7 |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class HARQueueNewImplDUnitTest extends JUnit4DistributedTestCase { |
| |
| private static final String regionName = HARQueueNewImplDUnitTest.class.getSimpleName(); |
| private static final Map<Object, Object> map = new HashMap<>(); |
| |
| private static Cache cache = null; |
| private static VM serverVM0 = null; |
| private static VM serverVM1 = null; |
| private static VM clientVM1 = null; |
| private static VM clientVM2 = null; |
| |
| private static final Logger logger = LogService.getLogger(); |
| private static int numOfCreates = 0; |
| private static int numOfUpdates = 0; |
| private static int numOfInvalidates = 0; |
| private static Object[] deletedValues = null; |
| |
| private int PORT1; |
| private int PORT2; |
| |
| @Rule |
| public DistributedRule distributedRule = new DistributedRule(); |
| |
| /** |
| * Sets up the test. |
| */ |
| @Before |
| public void setUp() { |
| map.clear(); |
| |
| serverVM0 = VM.getVM(0); |
| serverVM1 = VM.getVM(1); |
| clientVM1 = VM.getVM(2); |
| clientVM2 = VM.getVM(3); |
| |
| PORT1 = serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| PORT2 = serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_ENTRY)); |
| |
| numOfCreates = 0; |
| numOfUpdates = 0; |
| numOfInvalidates = 0; |
| clientVM1.invoke(() -> { |
| numOfCreates = 0; |
| numOfUpdates = 0; |
| numOfInvalidates = 0; |
| }); |
| } |
| |
| /** |
| * Tears down the test. |
| */ |
| @After |
| public void tearDown() { |
| map.clear(); |
| |
| closeCache(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::closeCache); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::closeCache); |
| |
| // Unset the isSlowStartForTesting flag |
| serverVM0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| serverVM1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| // then close the servers |
| serverVM0.invoke(HARQueueNewImplDUnitTest::closeCache); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::closeCache); |
| |
| |
| disconnectAllFromDS(); |
| } |
| |
| private void createCache(Properties props) throws Exception { |
| props.setProperty(DELTA_PROPAGATION, "false"); |
| DistributedSystem ds = getSystem(props); |
| ds.disconnect(); |
| ds = getSystem(props); |
| assertThat(ds).isNotNull(); |
| cache = CacheFactory.create(ds); |
| assertThat(cache).isNotNull(); |
| } |
| |
| public static Integer createServerCache() throws Exception { |
| return createServerCache(null); |
| } |
| |
| public static Integer createServerCache(String ePolicy) throws Exception { |
| return createServerCache(ePolicy, 1); |
| } |
| |
| public static Integer createServerCache(String ePolicy, Integer cap) throws Exception { |
| new HARQueueNewImplDUnitTest().createCache(new Properties()); |
| RegionFactory<Object, Object> factory = cache.createRegionFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.create(regionName); |
| |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer server1 = cache.addCacheServer(); |
| server1.setPort(port); |
| if (ePolicy != null) { |
| File overflowDirectory = new File("bsi_overflow_" + port); |
| overflowDirectory.mkdir(); |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| File[] dirs1 = new File[] {overflowDirectory}; |
| |
| server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy); |
| server1.getClientSubscriptionConfig().setCapacity(cap); |
| // specify disk store for this server |
| server1.getClientSubscriptionConfig() |
| .setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName()); |
| } |
| server1.start(); |
| return server1.getPort(); |
| } |
| |
| private static Integer createOneMoreBridgeServer(Boolean notifyBySubscription) throws Exception { |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer server1 = cache.addCacheServer(); |
| server1.setPort(port); |
| server1.setNotifyBySubscription(notifyBySubscription); |
| server1.getClientSubscriptionConfig() |
| .setEvictionPolicy(HARegionQueue.HA_EVICTION_POLICY_MEMORY); |
| // let this server to use default disk store |
| server1.start(); |
| return server1.getPort(); |
| } |
| |
| public static void createClientCache(String host, Integer port1, Integer port2, String rLevel, |
| Boolean addListener) throws Exception { |
| System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", |
| "true"); |
| |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new HARQueueNewImplDUnitTest().createCache(props); |
| AttributesFactory<Object, Object> factory = new AttributesFactory<>(); |
| ClientServerTestCase |
| .configureConnectionPool(factory, host, port1, port2, true, |
| Integer.parseInt(rLevel), |
| 2, null, 1000, 250, |
| -2); |
| |
| factory.setScope(Scope.LOCAL); |
| |
| if (addListener) { |
| factory.addCacheListener(new CacheListenerAdapter<Object, Object>() { |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| logger.debug("Invalidate Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| numOfInvalidates++; |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| logger.debug("Create Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| numOfCreates++; |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| logger.debug("Update Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| numOfUpdates++; |
| } |
| }); |
| } |
| |
| cache.createRegion(regionName, factory.create()); |
| } |
| |
| public static void createClientCache(String host, Integer port1, Integer port2, String rLevel) |
| throws Exception { |
| createClientCache(host, port1, port2, rLevel, Boolean.FALSE); |
| } |
| |
| private static void registerInterestListAll() { |
| try { |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| region.registerInterest("ALL_KEYS"); |
| } catch (GemFireException ex) { |
| fail("failed in registerInterestListAll", ex); |
| } |
| } |
| |
| private static void registerInterestList() { |
| try { |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| region.registerInterest("k1"); |
| region.registerInterest("k3"); |
| region.registerInterest("k5"); |
| } catch (GemFireException ex) { |
| fail("failed while registering keys", ex); |
| } |
| } |
| |
| private static void putEntries() { |
| try { |
| |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| |
| region.put("k1", "pv1"); |
| region.put("k2", "pv2"); |
| region.put("k3", "pv3"); |
| region.put("k4", "pv4"); |
| region.put("k5", "pv5"); |
| } catch (GemFireException ex) { |
| fail("failed in putEntries()", ex); |
| } |
| } |
| |
| public static void createEntries() { |
| try { |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| |
| region.create("k1", "v1"); |
| region.create("k2", "v2"); |
| region.create("k3", "v3"); |
| region.create("k4", "v4"); |
| region.create("k5", "v5"); |
| } catch (GemFireException ex) { |
| fail("failed in createEntries()", ex); |
| } |
| } |
| |
| public static void createEntries(Long num) { |
| try { |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| for (long i = 0; i < num; i++) { |
| region.create("k" + i, "v" + i); |
| } |
| } catch (GemFireException ex) { |
| fail("failed in createEntries(Long)", ex); |
| } |
| } |
| |
| private static void putHeavyEntries(Integer num) { |
| try { |
| byte[] val; |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| assertThat(region).isNotNull(); |
| for (long i = 0; i < num; i++) { |
| val = new byte[1024 * 1024 * 5]; // 5 MB |
| region.put("k0", val); |
| } |
| } catch (GemFireException ex) { |
| fail("failed in putHeavyEntries(Long)", ex); |
| } |
| } |
| |
| /** |
| * This test verifies that the client-messages-region does not store duplicate |
| * ClientUpdateMessageImpl instances, during a normal put path as well as the GII path. |
| */ |
| @Test |
| public void testClientMsgsRegionSize() throws Exception { |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| serverVM1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| } |
| |
| /** |
| * This test verifies that the ha-region-queues increment the reference count of their respective |
| * HAEventWrapper instances in the client-messages-region correctly, during put as well as GII |
| * path. |
| */ |
| @Test |
| public void testRefCountForNormalAndGIIPut() throws Exception { |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("240000")); |
| serverVM1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("240000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| |
| serverVM1.invoke(() -> ValidateRegionSizes(PORT2)); |
| serverVM0.invoke(() -> ValidateRegionSizes(PORT1)); |
| |
| |
| serverVM0.invoke(HARQueueNewImplDUnitTest::updateMapForVM0); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::updateMapForVM1); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT2)); |
| } |
| |
| private void ValidateRegionSizes(int port) { |
| await().untilAsserted(() -> { |
| Region region = cache.getRegion(SEPARATOR + regionName); |
| Region<Object, Object> msgsRegion = |
| cache.getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| int clientMsgRegionSize = msgsRegion.size(); |
| int regionSize = region.size(); |
| assertThat(((5 == clientMsgRegionSize) && (5 == regionSize))).describedAs( |
| "Region sizes were not as expected after 60 seconds elapsed. Actual region size = " |
| + regionSize + "Actual client msg region size = " + clientMsgRegionSize) |
| .isTrue(); |
| }); |
| } |
| |
| /** |
| * This test verifies that the ha-region-queues decrement the reference count of their respective |
| * HAEventWrapper instances in the client-messages-region correctly, after the events have been |
| * peeked and removed from the queue. |
| */ |
| @Test |
| public void testRefCountForPeekAndRemove() throws Exception { |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| |
| serverVM0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest |
| .waitTillMessagesAreDispatched(PORT1)); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 0)); |
| } |
| |
| /** |
| * This test verifies that the processing of the QRM messages results in decrementing the |
| * reference count of corresponding HAEventWrapper instances, correctly. |
| */ |
| @Test |
| public void testRefCountForQRM() throws Exception { |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| |
| serverVM0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 0)); |
| } |
| |
| /** |
| * This test verifies that the destruction of a ha-region (caused by proxy/client disconnect), |
| * causes the reference count of all HAEventWrapper instances belonging to the ha-region-queue to |
| * be decremented by one, and makes it visible to the client-messages-region. |
| */ |
| @Test |
| public void testRefCountForDestroy() throws Exception { |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| serverVM1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| // 1. stop the second server |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| // 3. start the second server. |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| Thread.sleep(3000); |
| |
| clientVM1.invoke(HARQueueNewImplDUnitTest::closeCache); |
| |
| Thread.sleep(1000); |
| serverVM0.invoke(HARQueueNewImplDUnitTest::updateMap1); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::updateMap1); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT2)); |
| |
| clientVM2.invoke(HARQueueNewImplDUnitTest::closeCache); |
| |
| serverVM0.invoke(HARQueueNewImplDUnitTest::updateMap2); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::updateMap2); |
| |
| Thread.sleep(1000); |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyQueueData( |
| PORT2)); |
| } |
| |
| /** |
| * Addresses the bug 39179. If a clientUpdateMessage is dispatched to the client while its GII was |
| * under way, then it should not be put into the HARegionQueue of a client at receiving server |
| * side. |
| */ |
| @Test |
| public void testConcurrentGIIAndDispatch() throws Exception { |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("40000")); |
| serverVM1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("40000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestListAll); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestListAll); |
| // 1. stop the second server |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest |
| .makeValuesOfSomeKeysNullInClientMsgsRegion(PORT1, new String[] {"k1", "k3"})); |
| // 3. start the second server. |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 3)); |
| |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyNullValuesInCMR( |
| PORT2, new String[] {"k1", "k3"})); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 3)); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest |
| .populateValuesOfSomeKeysInClientMsgsRegion(PORT1, new String[] {"k1", "k3"})); |
| |
| serverVM0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| serverVM1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| } |
| |
| /** |
| * This test verifies that when two BridgeServerImpl instances are created in a single VM, they do |
| * share the client-messages-region. |
| */ |
| @Test |
| public void testTwoBridgeServersInOneVMDoShareCMR() throws Exception { |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| Integer port3 = serverVM0 |
| .invoke(() -> HARQueueNewImplDUnitTest.createOneMoreBridgeServer(Boolean.TRUE)); |
| |
| createClientCache(getServerHostName(), PORT1, port3, "0"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyRegionSize(5, 5)); |
| } |
| |
| /** |
| * This test verifies that two clients, connected to two cache servers with different |
| * notifyBySubscription values, on a single VM, receive updates/invalidates depending upon their |
| * notifyBySubscription value. |
| */ |
| @Test |
| public void testUpdatesWithTwoBridgeServersInOneVM() throws Exception { |
| Integer port3 = serverVM0 |
| .invoke(() -> HARQueueNewImplDUnitTest.createOneMoreBridgeServer(Boolean.FALSE)); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1", Boolean.TRUE); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, port3, |
| PORT2, "1", Boolean.TRUE)); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestListAll); |
| |
| clientVM1.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| serverVM0.invoke(HARQueueNewImplDUnitTest::putEntries); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.waitTillMessagesAreDispatched(PORT1)); |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.waitTillMessagesAreDispatched(port3)); |
| |
| // expect updates |
| verifyUpdatesReceived(); |
| // expect invalidates |
| clientVM1.invoke(HARQueueNewImplDUnitTest::verifyUpdatesReceived); |
| } |
| |
| /** |
| * This test verifies that the HAEventWrapper instances present in the client-messages-region give |
| * up the references to their respective ClientUpdateMessageImpl instances. |
| */ |
| @Test |
| public void testHAEventWrapperDoesNotHoldCUMOnceInsideCMR() throws Exception { |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::stopServer); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.createEntries(1000L)); |
| |
| serverVM1.invoke(HARQueueNewImplDUnitTest::startServer); |
| Thread.sleep(2000); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.verifyNullCUMReference(PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.verifyNullCUMReference(PORT2)); |
| } |
| |
| /** |
| * This test verifies that client-messages-regions are not created for the cache servers who have |
| * eviction policy as 'none'. Instead, such cache servers will have simple HashMap structures. |
| * Also, it verifies that such a structure (referred to as haContainer, in general) is destroyed |
| * when its cache server is stopped. |
| */ |
| @Test |
| public void testCMRNotCreatedForNoneEvictionPolicy() throws Exception { |
| serverVM0.invoke(HARQueueNewImplDUnitTest::closeCache); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::closeCache); |
| Thread.sleep(2000); |
| PORT1 = serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE)); |
| PORT2 = serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE)); |
| Boolean isRegion = Boolean.FALSE; |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM0 |
| .invoke(() -> HARQueueNewImplDUnitTest.verifyHaContainerType(isRegion, PORT1)); |
| serverVM1 |
| .invoke(() -> HARQueueNewImplDUnitTest.verifyHaContainerType(isRegion, PORT2)); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.stopOneBridgeServer(PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.stopOneBridgeServer(PORT2)); |
| |
| serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyHaContainerDestroyed(isRegion, PORT1)); |
| serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyHaContainerDestroyed(isRegion, PORT2)); |
| } |
| |
| /** |
| * This test verifies that client-messages-regions are created for the cache servers who have |
| * eviction policy either as 'mem' or as 'entry'. Also, it verifies that such a |
| * client-messages-region is destroyed when its cache server is stopped. |
| */ |
| @Test |
| public void testCMRCreatedForMemOrEntryEvictionPolicy() throws Exception { |
| Boolean isRegion = Boolean.TRUE; |
| // slow start for dispatcher |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("30000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM0 |
| .invoke(() -> HARQueueNewImplDUnitTest.verifyHaContainerType(isRegion, PORT1)); |
| serverVM1 |
| .invoke(() -> HARQueueNewImplDUnitTest.verifyHaContainerType(isRegion, PORT2)); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.stopOneBridgeServer(PORT1)); |
| serverVM1.invoke(() -> HARQueueNewImplDUnitTest.stopOneBridgeServer(PORT2)); |
| |
| serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyHaContainerDestroyed(isRegion, PORT1)); |
| serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyHaContainerDestroyed(isRegion, PORT2)); |
| } |
| |
| /** |
| * This test verifies that the Cache.rootRegions() method does not return the |
| * client-messages-region of any of the cache's attached cache servers. |
| */ |
| @Test |
| public void testCMRNotReturnedByRootRegionsMethod() throws Exception { |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestList); |
| |
| serverVM0.invoke((SerializableRunnableIF) HARQueueNewImplDUnitTest::createEntries); |
| |
| serverVM0.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyRootRegionsDoesNotReturnCMR(PORT1)); |
| serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.verifyRootRegionsDoesNotReturnCMR(PORT2)); |
| } |
| |
| /** |
| * This test verifies that the memory footprint of the ha region queues is less when ha-overflow |
| * is enabled (with an appropriate value of haCapacity) compared to when it is disabled, for the |
| * same amount of data feed. |
| */ |
| @Ignore("TODO") |
| @Test |
| public void testMemoryFootprintOfHARegionQueuesWithAndWithoutOverflow() throws Exception { |
| serverVM0.invoke(HARQueueNewImplDUnitTest::closeCache); |
| serverVM1.invoke(HARQueueNewImplDUnitTest::closeCache); |
| Thread.sleep(2000); |
| Integer numOfEntries = 30; |
| |
| PORT1 = serverVM0.invoke(() -> HARQueueNewImplDUnitTest |
| .createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 30)); |
| PORT2 = serverVM1.invoke( |
| () -> HARQueueNewImplDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE)); |
| |
| serverVM0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| serverVM1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| |
| createClientCache(getServerHostName(), PORT1, PORT2, |
| "1"); |
| final String client1Host = getServerHostName(); |
| clientVM1.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client1Host, |
| PORT1, PORT2, "1")); |
| final String client2Host = getServerHostName(); |
| clientVM2.invoke(() -> HARQueueNewImplDUnitTest.createClientCache(client2Host, |
| PORT1, PORT2, "1")); |
| |
| registerInterestListAll(); |
| clientVM1.invoke(HARQueueNewImplDUnitTest::registerInterestListAll); |
| clientVM2.invoke(HARQueueNewImplDUnitTest::registerInterestListAll); |
| |
| serverVM0.invoke(() -> HARQueueNewImplDUnitTest.putHeavyEntries(numOfEntries)); |
| |
| Long usedMemInVM0 = serverVM0.invoke(() -> HARQueueNewImplDUnitTest |
| .getUsedMemoryAndVerifyRegionSize(numOfEntries, PORT1)); |
| Long usedMemInVM1 = serverVM1.invoke(() -> HARQueueNewImplDUnitTest |
| .getUsedMemoryAndVerifyRegionSize(numOfEntries, -1)); |
| |
| serverVM0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| serverVM1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| logger.debug("Used Mem: " + usedMemInVM1 + "(without overflow), " |
| + usedMemInVM0 + "(with overflow)"); |
| |
| assertThat(usedMemInVM0 < usedMemInVM1).isTrue(); |
| } |
| |
| private static void verifyNullCUMReference(Integer port) { |
| Region<Object, Object> region = |
| cache.getRegion(SEPARATOR + CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| assertThat(region).isNotNull(); |
| |
| Object[] arr = region.keySet().toArray(); |
| for (Object o : arr) { |
| assertThat(((HAEventWrapper) o).getClientUpdateMessage()).isNull(); |
| } |
| |
| } |
| |
| private static void verifyHaContainerDestroyed(Boolean isRegion, Integer port) { |
| Map region = cache.getRegion(SEPARATOR + CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| |
| if (isRegion) { |
| if (region != null) { |
| assertThat(((Region) region).isDestroyed()).isTrue(); |
| } |
| } else { |
| region = ((InternalCacheServer) cache.getCacheServers().toArray()[0]).getAcceptor() |
| .getCacheClientNotifier().getHaContainer(); |
| if (region != null) { |
| assertThat(region.isEmpty()).isTrue(); |
| } |
| } |
| } |
| |
| private static Long getUsedMemoryAndVerifyRegionSize(Integer haContainerSize, |
| Integer port) { |
| Long retVal; |
| retVal = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); |
| if (port != -1) { |
| verifyRegionSize(1, haContainerSize); |
| } else { |
| verifyRegionSize(haContainerSize); |
| } |
| return retVal; |
| } |
| |
| private static void stopOneBridgeServer(Integer port) { |
| Iterator iterator = cache.getCacheServers().iterator(); |
| if (iterator.hasNext()) { |
| CacheServer server = (CacheServer) iterator.next(); |
| if (server.getPort() == port) { |
| server.stop(); |
| } |
| } |
| } |
| |
| public static void stopServer() { |
| Iterator iterator = cache.getCacheServers().iterator(); |
| if (iterator.hasNext()) { |
| CacheServer server = (CacheServer) iterator.next(); |
| server.stop(); |
| } |
| } |
| |
| private static void updateMapForVM0() { |
| map.put("k1", 3L); |
| map.put("k2", 1L); |
| map.put("k3", 3L); |
| map.put("k4", 1L); |
| map.put("k5", 3L); |
| } |
| |
| private static void updateMap1() { |
| map.put("k1", 2L); |
| map.put("k2", 1L); |
| map.put("k3", 2L); |
| map.put("k4", 1L); |
| map.put("k5", 2L); |
| } |
| |
| private static void updateMap2() { |
| map.put("k1", 1L); |
| map.put("k2", 1L); |
| map.put("k3", 1L); |
| map.put("k4", 1L); |
| map.put("k5", 1L); |
| |
| } |
| |
| private static void updateMapForVM1() { |
| updateMapForVM0(); |
| } |
| |
| private static void verifyNullValuesInCMR(final Integer port, |
| String[] keys) { |
| final Region<Object, Object> msgsRegion = |
| cache.getRegion(generateNameForClientMsgsRegion(port)); |
| |
| GeodeAwaitility.await().until(() -> msgsRegion.size() == 3); |
| |
| Set entries = msgsRegion.entrySet(); |
| Iterator iterator = entries.iterator(); |
| for (; iterator.hasNext();) { |
| Entry entry = (Entry) iterator.next(); |
| ClientUpdateMessage cum = (ClientUpdateMessage) entry.getValue(); |
| for (String key : keys) { |
| logger.debug("cum.key: " + cum.getKeyToConflate()); |
| // assert that the keys are not present in entries set |
| assertThat(!key.equals(cum.getKeyToConflate())).isTrue(); |
| } |
| } |
| } |
| |
| private static void makeValuesOfSomeKeysNullInClientMsgsRegion(Integer port, String[] keys) { |
| Region<Object, Object> msgsRegion = |
| cache.getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| assertThat(msgsRegion).isNotNull(); |
| |
| Set entries = msgsRegion.entrySet(); |
| Iterator iterator = entries.iterator(); |
| deletedValues = new Object[keys.length]; |
| while (iterator.hasNext()) { |
| Region.Entry entry = (Region.Entry) iterator.next(); |
| ClientUpdateMessage cum = (ClientUpdateMessage) entry.getValue(); |
| for (int i = 0; i < keys.length; i++) { |
| if (keys[i].equals(cum.getKeyToConflate())) { |
| logger.debug("HARQueueNewImplDUnit: Removing " + cum.getKeyOfInterest()); |
| deletedValues[i] = msgsRegion.remove(entry.getKey()); |
| } |
| } |
| } |
| } |
| |
| private static void populateValuesOfSomeKeysInClientMsgsRegion(Integer port, String[] keys) { |
| Region<Object, Object> msgsRegion = |
| cache.getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| assertThat(msgsRegion).isNotNull(); |
| |
| for (int i = 0; i < keys.length; i++) { |
| logger.debug("HARQueueNewImplDUnit: populating " + deletedValues[i]); |
| msgsRegion.put(keys[1], deletedValues[i]); |
| } |
| } |
| |
| public static void startServer() throws IOException { |
| |
| Iterator iterator = cache.getCacheServers().iterator(); |
| if (iterator.hasNext()) { |
| CacheServer server = (CacheServer) iterator.next(); |
| server.start(); |
| } |
| |
| } |
| |
| private static void verifyQueueData(Integer port) { |
| // Get the clientMessagesRegion and check the size. |
| Region<Object, Object> msgsRegion = |
| cache.getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| Region region = cache.getRegion(SEPARATOR + regionName); |
| logger.debug( |
| "size<serverRegion, clientMsgsRegion>: " + region.size() + ", " + msgsRegion.size()); |
| assertThat(region.size()).isEqualTo(((Integer) 5).intValue()); |
| assertThat(msgsRegion.size()).isEqualTo(((Integer) 5).intValue()); |
| |
| for (Object o : msgsRegion.entrySet()) { |
| await().untilAsserted(() -> { |
| Entry entry = (Entry) o; |
| HAEventWrapper wrapper = (HAEventWrapper) entry.getKey(); |
| ClientUpdateMessage cum = (ClientUpdateMessage) entry.getValue(); |
| Object key = cum.getKeyOfInterest(); |
| logger.debug("key<feedCount, regionCount>: " + key + "<" |
| + map.get(key) + ", " + wrapper.getReferenceCount() + ">"); |
| assertThat(wrapper.getReferenceCount()).isEqualTo(((Long) map.get(key)).longValue()); |
| }); |
| } |
| } |
| |
| private static void verifyRegionSize(final Integer regionSize, final Integer msgsRegionSize) { |
| GeodeAwaitility.await().until(() -> { |
| // Get the clientMessagesRegion and check the size. |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| int sz = region.size(); |
| if (regionSize != sz) { |
| return false; |
| } |
| |
| Iterator iterator = cache.getCacheServers().iterator(); |
| if (iterator.hasNext()) { |
| InternalCacheServer server = (InternalCacheServer) iterator.next(); |
| Map msgsRegion = server.getAcceptor().getCacheClientNotifier().getHaContainer(); |
| |
| sz = msgsRegion.size(); |
| return msgsRegionSize == sz; |
| } |
| return true; |
| }); |
| } |
| |
| private static void verifyRegionSize(final Integer msgsRegionSize) { |
| |
| GeodeAwaitility.await().until(() -> { |
| try { |
| // Get the clientMessagesRegion and check the size. |
| Region<Object, Object> region = cache.getRegion(SEPARATOR + regionName); |
| int sz = region.size(); |
| if (sz != 1) { |
| return false; |
| } |
| Iterator iterator = cache.getCacheServers().iterator(); |
| if (!iterator.hasNext()) { |
| return true; |
| } |
| InternalCacheServer server = (InternalCacheServer) iterator.next(); |
| sz = server.getAcceptor().getCacheClientNotifier().getHaContainer().size(); |
| return sz == msgsRegionSize; |
| } catch (Exception e) { |
| return false; |
| } |
| }); |
| } |
| |
| private static void verifyHaContainerType(Boolean isRegion, Integer port) { |
| Map<Object, Object> haMap = |
| cache.getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port)); |
| if (isRegion) { |
| assertThat(haMap).isNotNull(); |
| assertThat(haMap instanceof LocalRegion).isTrue(); |
| haMap = (Map<Object, Object>) ((InternalCacheServer) cache.getCacheServers().toArray()[0]) |
| .getAcceptor() |
| .getCacheClientNotifier().getHaContainer(); |
| assertThat(haMap).isNotNull(); |
| assertThat(haMap instanceof HAContainerRegion).isTrue(); |
| } else { |
| assertThat(haMap).isNull(); |
| haMap = (Map<Object, Object>) ((InternalCacheServer) cache.getCacheServers().toArray()[0]) |
| .getAcceptor() |
| .getCacheClientNotifier().getHaContainer(); |
| assertThat(haMap).isNotNull(); |
| assertThat(haMap instanceof HAContainerMap).isTrue(); |
| } |
| } |
| |
| private static void verifyRootRegionsDoesNotReturnCMR(Integer port) { |
| String cmrName = CacheServerImpl.generateNameForClientMsgsRegion(port); |
| Map<Object, Object> haMap = cache.getRegion(cmrName); |
| assertThat(haMap).isNotNull(); |
| String rName; |
| |
| for (Region<?, ?> region : cache.rootRegions()) { |
| rName = region.getName(); |
| if (cmrName.equals(rName)) { |
| throw new AssertionError( |
| "Cache.rootRegions() method should not return the client_messages_region."); |
| } |
| logger.debug("Region name returned from cache.rootRegions(): " + rName); |
| } |
| } |
| |
| private static void verifyUpdatesReceived() { |
| GeodeAwaitility.await().until(() -> 5 == numOfUpdates); |
| } |
| |
| private static void waitTillMessagesAreDispatched(Integer port) { |
| Map haContainer; |
| haContainer = cache.getRegion( |
| SEPARATOR + generateNameForClientMsgsRegion(port)); |
| if (haContainer == null) { |
| Object[] servers = cache.getCacheServers().toArray(); |
| for (Object server : servers) { |
| if (port == ((InternalCacheServer) server).getPort()) { |
| haContainer = ((InternalCacheServer) server).getAcceptor().getCacheClientNotifier() |
| .getHaContainer(); |
| break; |
| } |
| } |
| } |
| final Map m = haContainer; |
| GeodeAwaitility.await().until(() -> m.size() == 0); |
| } |
| |
| public static void closeCache() { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().getDistributedMember(); |
| } |
| } |
| |
| } |