Merge pull request #4892 from apache/feature/GEODE-7939

GEODE-7939: membership heartbeat messages aren't designated high-prio…
diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml b/boms/geode-all-bom/src/test/resources/expected-pom.xml
index ddfe4d2..787a47b 100644
--- a/boms/geode-all-bom/src/test/resources/expected-pom.xml
+++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml
@@ -394,7 +394,7 @@
       <dependency>
         <groupId>org.apache.shiro</groupId>
         <artifactId>shiro-core</artifactId>
-        <version>1.4.1</version>
+        <version>1.5.2</version>
         <scope>compile</scope>
       </dependency>
       <dependency>
diff --git a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
index 37beb88..62132af 100644
--- a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
+++ b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
@@ -42,7 +42,7 @@
     deps.put("jgroups.version", "3.6.14.Final")
     deps.put("log4j.version", "2.12.1")
     deps.put("micrometer.version", "1.2.1")
-    deps.put("shiro.version", "1.4.1")
+    deps.put("shiro.version", "1.5.2")
     deps.put("slf4j-api.version", "1.7.28")
 
     // These version numbers are used in testing various versions of tomcat and are consumed explicitly
diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt
index 2c08797..2af4eee 100644
--- a/geode-assembly/src/integrationTest/resources/assembly_content.txt
+++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt
@@ -1051,15 +1051,15 @@
 lib/protobuf-java-3.10.0.jar
 lib/ra.jar
 lib/rmiio-2.1.2.jar
-lib/shiro-cache-1.4.1.jar
-lib/shiro-config-core-1.4.1.jar
-lib/shiro-config-ogdl-1.4.1.jar
-lib/shiro-core-1.4.1.jar
-lib/shiro-crypto-cipher-1.4.1.jar
-lib/shiro-crypto-core-1.4.1.jar
-lib/shiro-crypto-hash-1.4.1.jar
-lib/shiro-event-1.4.1.jar
-lib/shiro-lang-1.4.1.jar
+lib/shiro-cache-1.5.2.jar
+lib/shiro-config-core-1.5.2.jar
+lib/shiro-config-ogdl-1.5.2.jar
+lib/shiro-core-1.5.2.jar
+lib/shiro-crypto-cipher-1.5.2.jar
+lib/shiro-crypto-core-1.5.2.jar
+lib/shiro-crypto-hash-1.5.2.jar
+lib/shiro-event-1.5.2.jar
+lib/shiro-lang-1.5.2.jar
 lib/slf4j-api-1.7.28.jar
 lib/snappy-0.4.jar
 lib/spring-beans-5.2.1.RELEASE.jar
diff --git a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
index 578b7ab..7bed289 100644
--- a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
+++ b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
@@ -34,8 +34,8 @@
 javax.activation-1.2.0.jar
 istack-commons-runtime-3.0.9.jar
 commons-validator-1.6.jar
-shiro-core-1.4.1.jar
-shiro-config-ogdl-1.4.1.jar
+shiro-core-1.5.2.jar
+shiro-config-ogdl-1.5.2.jar
 commons-beanutils-1.9.4.jar
 commons-collections-3.2.2.jar
 commons-io-2.6.jar
@@ -56,13 +56,13 @@
 httpcore-4.4.12.jar
 snappy-0.4.jar
 jgroups-3.6.14.Final.jar
-shiro-cache-1.4.1.jar
-shiro-crypto-hash-1.4.1.jar
-shiro-crypto-cipher-1.4.1.jar
-shiro-config-core-1.4.1.jar
-shiro-event-1.4.1.jar
-shiro-crypto-core-1.4.1.jar
-shiro-lang-1.4.1.jar
+shiro-cache-1.5.2.jar
+shiro-crypto-hash-1.5.2.jar
+shiro-crypto-cipher-1.5.2.jar
+shiro-config-core-1.5.2.jar
+shiro-event-1.5.2.jar
+shiro-crypto-core-1.5.2.jar
+shiro-lang-1.5.2.jar
 slf4j-api-1.7.28.jar
 swagger-annotations-1.5.23.jar
 spring-core-5.2.1.RELEASE.jar
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
index dcd5c4f..4ecba96 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
@@ -16,35 +16,40 @@
 
 import static java.util.Arrays.asList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.cache.RegionShortcut.LOCAL;
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
 import static org.apache.geode.internal.lang.SystemPropertyHelper.GEMFIRE_PREFIX;
 import static org.apache.geode.management.ManagementService.getExistingManagementService;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
-import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.VM.getController;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.apache.geode.test.dunit.rules.DistributedRule.getDistributedSystemProperties;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UncheckedIOException;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.junit.After;
 import org.junit.Before;
@@ -54,32 +59,37 @@
 
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.ClientMetadataService;
 import org.apache.geode.cache.client.internal.ClientPartitionAdvisor;
+import org.apache.geode.cache.client.internal.InternalClientCache;
 import org.apache.geode.cache.execute.FunctionAdapter;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.RegionFunctionContext;
 import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile;
+import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
 import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.membership.MembershipEvent;
 import org.apache.geode.management.membership.UniversalMembershipListenerAdapter;
 import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitEnv;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.CacheRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -89,21 +99,24 @@
 @SuppressWarnings("serial")
 public class PartitionedRegionSingleHopDUnitTest implements Serializable {
 
-  private static final String PR_NAME = "single_hop_pr";
-  private static final String ORDER = "ORDER";
-  private static final String CUSTOMER = "CUSTOMER";
-  private static final String SHIPMENT = "SHIPMENT";
+  private static final String PARTITIONED_REGION_NAME = "single_hop_pr";
+  private static final String ORDER_REGION_NAME = "ORDER";
+  private static final String CUSTOMER_REGION_NAME = "CUSTOMER";
+  private static final String SHIPMENT_REGION_NAME = "SHIPMENT";
+  private static final String REPLICATE_REGION_NAME = "rr";
+
   private static final int LOCAL_MAX_MEMORY_DEFAULT = -1;
 
-  private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
+  private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class);
+  private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class);
+  private static final InternalClientCache DUMMY_CLIENT = mock(InternalClientCache.class);
+  private static final InternalCache DUMMY_CACHE = mock(InternalCache.class);
+  private static final AtomicReference<ServerLauncher> SERVER = new AtomicReference<>();
+  private static final AtomicReference<LocatorLauncher> LOCATOR = new AtomicReference<>();
+  private static final AtomicReference<InternalClientCache> CLIENT = new AtomicReference<>();
+  private static final AtomicReference<InternalCache> CACHE = new AtomicReference<>();
 
-  private static volatile Region<Object, Object> testRegion;
-  private static volatile Region<Object, Object> customerRegion;
-  private static volatile Region<Object, Object> orderRegion;
-  private static volatile Region<Object, Object> shipmentRegion;
-  private static volatile Region<Object, Object> replicatedRegion;
-  private static volatile InternalCache cache;
-  private static volatile Locator locator;
+  private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
 
   private String diskStoreName;
 
@@ -115,8 +128,6 @@
   @Rule
   public DistributedRule distributedRule = new DistributedRule();
   @Rule
-  public CacheRule cacheRule = new CacheRule();
-  @Rule
   public DistributedRestoreSystemProperties restoreProps = new DistributedRestoreSystemProperties();
   @Rule
   public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
@@ -131,24 +142,27 @@
     vm1 = getVM(1);
     vm2 = getVM(2);
     vm3 = getVM(3);
+
+    for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
+      vm.invoke(() -> {
+        CLIENT.set(DUMMY_CLIENT);
+        CACHE.set(DUMMY_CACHE);
+        SERVER.set(DUMMY_SERVER);
+        LOCATOR.set(DUMMY_LOCATOR);
+      });
+    }
   }
 
   @After
   public void tearDown() {
     for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
       vm.invoke(() -> {
-        cacheRule.closeAndNullCache();
-        locator = null;
-        cache = null;
-        testRegion = null;
-        customerRegion = null;
-        orderRegion = null;
-        shipmentRegion = null;
-        replicatedRegion = null;
+        CLIENT.getAndSet(DUMMY_CLIENT).close();
+        CACHE.getAndSet(DUMMY_CACHE).close();
+        SERVER.getAndSet(DUMMY_SERVER).stop();
+        LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
       });
     }
-
-    disconnectAllFromDS();
   }
 
   /**
@@ -156,13 +170,11 @@
    */
   @Test
   public void testNoClient() throws Exception {
-    vm0.invoke(() -> createServer(1, 4));
-    vm1.invoke(() -> createServer(1, 4));
-
-    vm2.invoke(() -> createPeer());
-    vm3.invoke(() -> createPeer());
-
-    createAccessorServer();
+    vm0.invoke(() -> createServer(-1, 1, 4));
+    vm1.invoke(() -> createServer(-1, 1, 4));
+    vm2.invoke(() -> createAccessorPeer(1, 4));
+    vm3.invoke(() -> createAccessorPeer(1, 4));
+    createAccessorServer(1, 4);
 
     for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
       vm.invoke(() -> clearMetadata());
@@ -173,15 +185,17 @@
     }
 
     for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
-      vm.invoke(() -> getFromPartitionedRegions());
+      vm.invoke(() -> doGets());
     }
 
     for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
-      vm.invoke(() -> verifyEmptyMetadata());
-    }
+      vm.invoke(() -> {
+        ClientMetadataService clientMetadataService =
+            getInternalCache(SERVER.get()).getClientMetadataService();
 
-    for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) {
-      vm.invoke(() -> verifyEmptyStaticData());
+        assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty();
+        assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty();
+      });
     }
   }
 
@@ -191,21 +205,19 @@
    */
   @Test
   public void testClientConnectedToAccessors() {
-    int port0 = vm0.invoke(() -> createAccessorServer());
-    int port1 = vm1.invoke(() -> createAccessorServer());
-
-    vm2.invoke(() -> createPeer());
-    vm3.invoke(() -> createPeer());
-
-    createClient(port0, port1);
+    int port0 = vm0.invoke(() -> createAccessorServer(1, 4));
+    int port1 = vm1.invoke(() -> createAccessorServer(1, 4));
+    vm2.invoke(() -> createAccessorPeer(1, 4));
+    vm3.invoke(() -> createAccessorPeer(1, 4));
+    createClient(250, true, true, true, port0, port1);
 
     putIntoPartitionedRegions();
+    doGets();
 
-    getFromPartitionedRegions();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
 
-    verifyEmptyMetadata();
-
-    verifyEmptyStaticData();
+    assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty();
+    assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty();
   }
 
   /**
@@ -214,22 +226,19 @@
    */
   @Test
   public void testClientConnectedTo1Server() {
-    int port0 = vm0.invoke(() -> createServer(1, 4));
-
-    vm1.invoke(() -> createPeer());
-    vm2.invoke(() -> createPeer());
-
-    vm3.invoke(() -> createAccessorServer());
-
-    createClient(port0);
+    int port0 = vm0.invoke(() -> createServer(-1, 1, 4));
+    vm1.invoke(() -> createAccessorPeer(1, 4));
+    vm2.invoke(() -> createAccessorPeer(1, 4));
+    vm3.invoke(() -> createAccessorServer(1, 4));
+    createClient(250, true, true, true, port0);
 
     putIntoPartitionedRegions();
+    doGets();
 
-    getFromPartitionedRegions();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
 
-    verifyEmptyMetadata();
-
-    verifyEmptyStaticData();
+    assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty();
+    assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty();
   }
 
   /**
@@ -239,19 +248,48 @@
    */
   @Test
   public void testMetadataContents() {
-    int port0 = vm0.invoke(() -> createServer(1, 4));
-    int port1 = vm1.invoke(() -> createServer(1, 4));
-    int port2 = vm2.invoke(() -> createServer(1, 4));
-    int port3 = vm3.invoke(() -> createServer(1, 4));
-
-    createClient(port0, port1, port2, port3);
+    int port0 = vm0.invoke(() -> createServer(-1, 1, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 1, 4));
+    int port2 = vm2.invoke(() -> createServer(-1, 1, 4));
+    int port3 = vm3.invoke(() -> createServer(-1, 1, 4));
+    createClient(100, true, false, true, port0, port1, port2, port3);
 
     putIntoPartitionedRegions();
+    doGets();
 
-    getFromPartitionedRegions();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
 
-    verifyMetadata();
-    updateIntoSinglePR();
+    await().untilAsserted(() -> {
+      assertThat(clientMetadataService.getRefreshTaskCount_TEST_ONLY()).isZero();
+    });
+
+    clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
+
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.put(0, "update0");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(1, "update1");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(2, "update2");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(3, "update3");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(0, "update00");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(1, "update11");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(2, "update22");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
+
+    partitionedRegion.put(3, "update33");
+    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
   }
 
   /**
@@ -261,21 +299,22 @@
    */
   @Test
   public void testMetadataServiceCallAccuracy() {
-    int port0 = vm0.invoke(() -> createServer(1, 4));
-    int port1 = vm1.invoke(() -> createServer(1, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 1, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 1, 4));
+    vm2.invoke(() -> createClient(250, true, true, true, port0));
+    createClient(250, true, true, true, port1);
 
-    vm2.invoke(() -> createClient(port0));
-    createClient(port1);
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    vm2.invoke(() -> putIntoSinglePR());
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.put(0, "create0");
+    partitionedRegion.put(1, "create1");
+    partitionedRegion.put(2, "create2");
+    partitionedRegion.put(3, "create3");
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue();
@@ -288,10 +327,10 @@
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
+    partitionedRegion.put(0, "create0");
+    partitionedRegion.put(1, "create1");
+    partitionedRegion.put(2, "create2");
+    partitionedRegion.put(3, "create3");
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -300,21 +339,22 @@
 
   @Test
   public void testMetadataServiceCallAccuracy_FromDestroyOp() {
-    int port0 = vm0.invoke(() -> createServer(0, 4));
-    int port1 = vm1.invoke(() -> createServer(0, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 0, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 0, 4));
+    vm2.invoke(() -> createClient(250, true, true, true, port0));
+    createClient(250, true, true, true, port1);
 
-    vm2.invoke(() -> createClient(port0));
-    createClient(port1);
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    vm2.invoke(() -> putIntoSinglePR());
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.destroy(0);
-    testRegion.destroy(1);
-    testRegion.destroy(2);
-    testRegion.destroy(3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.destroy(0);
+    partitionedRegion.destroy(1);
+    partitionedRegion.destroy(2);
+    partitionedRegion.destroy(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue();
@@ -323,21 +363,22 @@
 
   @Test
   public void testMetadataServiceCallAccuracy_FromGetOp() {
-    int port0 = vm0.invoke(() -> createServer(0, 4));
-    int port1 = vm1.invoke(() -> createServer(0, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 0, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 0, 4));
+    vm2.invoke(() -> createClient(250, true, true, true, port0));
+    createClient(250, true, true, true, port1);
 
-    vm2.invoke(() -> createClient(port0));
-    createClient(port1);
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    vm2.invoke(() -> putIntoSinglePR());
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.get(0);
-    testRegion.get(1);
-    testRegion.get(2);
-    testRegion.get(3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.get(0);
+    partitionedRegion.get(1);
+    partitionedRegion.get(2);
+    partitionedRegion.get(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue();
@@ -345,10 +386,10 @@
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.get(0);
-    testRegion.get(1);
-    testRegion.get(2);
-    testRegion.get(3);
+    partitionedRegion.get(0);
+    partitionedRegion.get(1);
+    partitionedRegion.get(2);
+    partitionedRegion.get(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -357,24 +398,25 @@
 
   @Test
   public void testSingleHopWithHA() {
-    int port0 = vm0.invoke(() -> createServer(0, 8));
-    int port1 = vm1.invoke(() -> createServer(0, 8));
-    int port2 = vm2.invoke(() -> createServer(0, 8));
-    int port3 = vm3.invoke(() -> createServer(0, 8));
+    int port0 = vm0.invoke(() -> createServer(-1, 0, 8));
+    int port1 = vm1.invoke(() -> createServer(-1, 0, 8));
+    int port2 = vm2.invoke(() -> createServer(-1, 0, 8));
+    int port3 = vm3.invoke(() -> createServer(-1, 0, 8));
+    createClient(100, true, false, true, port0, port1, port2, port3);
 
-    createClient(port0, port1, port2, port3);
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
     // put
     for (int i = 1; i <= 16; i++) {
-      testRegion.put(i, i);
+      partitionedRegion.put(i, i);
     }
 
     // update
     for (int i = 1; i <= 16; i++) {
-      testRegion.put(i, i + 1);
+      partitionedRegion.put(i, i + 1);
     }
 
     await().untilAsserted(() -> {
@@ -386,64 +428,59 @@
 
     // again update
     for (int i = 1; i <= 16; i++) {
-      testRegion.put(i, i + 10);
+      partitionedRegion.put(i, i + 10);
     }
   }
 
   @Test
   public void testSingleHopWithHAWithLocator() {
-    int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    String locator = "localhost[" + port3 + "]";
+    int locatorPort = vm3.invoke(() -> startLocator());
+    String locators = "localhost[" + locatorPort + "]";
 
-    vm3.invoke(() -> startLocatorInVM(port3));
+    vm0.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8));
+    vm1.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8));
+    vm2.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8));
+    createClient(250, true, true, false, locatorPort);
 
-    try {
-      vm0.invoke(() -> createServerWithLocator(locator));
-      vm1.invoke(() -> createServerWithLocator(locator));
-      vm2.invoke(() -> createServerWithLocator(locator));
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
-      createClientWithLocator("localhost", port3);
+    // put
+    for (int i = 1; i <= 16; i++) {
+      partitionedRegion.put(i, i);
+    }
 
-      // put
-      for (int i = 1; i <= 16; i++) {
-        testRegion.put(i, i);
-      }
+    // update
+    for (int i = 1; i <= 16; i++) {
+      partitionedRegion.put(i, i + 1);
+    }
 
-      // update
-      for (int i = 1; i <= 16; i++) {
-        testRegion.put(i, i + 1);
-      }
+    // kill server
+    vm0.invoke(() -> stopServer());
 
-      // kill server
-      vm0.invoke(() -> stopServer());
-
-      // again update
-      for (int i = 1; i <= 16; i++) {
-        testRegion.put(i, i + 10);
-      }
-
-    } finally {
-      vm3.invoke(() -> stopLocator());
+    // again update
+    for (int i = 1; i <= 16; i++) {
+      partitionedRegion.put(i, i + 10);
     }
   }
 
   @Test
   public void testNoMetadataServiceCall_ForGetOp() {
-    int port0 = vm0.invoke(() -> createServer(0, 4));
-    int port1 = vm1.invoke(() -> createServer(0, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 0, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 0, 4));
+    vm2.invoke(() -> createClient(250, false, true, true, port0));
+    createClient(250, false, true, true, port1);
 
-    vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0));
-    createClientWithoutPRSingleHopEnabled(port1);
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    vm2.invoke(() -> putIntoSinglePR());
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    testRegion.get(0);
-    testRegion.get(1);
-    testRegion.get(2);
-    testRegion.get(3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.get(0);
+    partitionedRegion.get(1);
+    partitionedRegion.get(2);
+    partitionedRegion.get(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -451,10 +488,10 @@
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.get(0);
-    testRegion.get(1);
-    testRegion.get(2);
-    testRegion.get(3);
+    partitionedRegion.get(0);
+    partitionedRegion.get(1);
+    partitionedRegion.get(2);
+    partitionedRegion.get(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -463,30 +500,31 @@
 
   @Test
   public void testNoMetadataServiceCall() {
-    int port0 = vm0.invoke(() -> createServer(1, 4));
-    int port1 = vm1.invoke(() -> createServer(1, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 1, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 1, 4));
+    vm2.invoke(() -> createClient(250, false, true, true, port0));
+    createClient(250, false, true, true, port1);
 
-    vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0));
-    createClientWithoutPRSingleHopEnabled(port1);
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    vm2.invoke(() -> putIntoSinglePR());
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
-    testRegion.put(0, "create0");
+    partitionedRegion.put(0, "create0");
     boolean metadataRefreshed_get1 = clientMetadataService.isRefreshMetadataTestOnly();
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
-    testRegion.put(1, "create1");
+    partitionedRegion.put(1, "create1");
     boolean metadataRefreshed_get2 = clientMetadataService.isRefreshMetadataTestOnly();
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
-    testRegion.put(2, "create2");
+    partitionedRegion.put(2, "create2");
     boolean metadataRefreshed_get3 = clientMetadataService.isRefreshMetadataTestOnly();
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
-    testRegion.put(3, "create3");
+    partitionedRegion.put(3, "create3");
     boolean metadataRefreshed_get4 = clientMetadataService.isRefreshMetadataTestOnly();
 
     await().untilAsserted(() -> {
@@ -498,10 +536,10 @@
 
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
+    partitionedRegion.put(0, "create0");
+    partitionedRegion.put(1, "create1");
+    partitionedRegion.put(2, "create2");
+    partitionedRegion.put(3, "create3");
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -510,21 +548,22 @@
 
   @Test
   public void testNoMetadataServiceCall_ForDestroyOp() {
-    int port0 = vm0.invoke(() -> createServer(0, 4));
-    int port1 = vm1.invoke(() -> createServer(0, 4));
+    int port0 = vm0.invoke(() -> createServer(-1, 0, 4));
+    int port1 = vm1.invoke(() -> createServer(-1, 0, 4));
+    vm2.invoke(() -> createClient(250, false, true, true, port0));
+    createClient(250, false, true, true, port1);
 
-    vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0));
-    createClientWithoutPRSingleHopEnabled(port1);
+    vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME)));
 
-    vm2.invoke(() -> putIntoSinglePR());
-
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
 
-    testRegion.destroy(0);
-    testRegion.destroy(1);
-    testRegion.destroy(2);
-    testRegion.destroy(3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    partitionedRegion.destroy(0);
+    partitionedRegion.destroy(1);
+    partitionedRegion.destroy(2);
+    partitionedRegion.destroy(3);
 
     await().untilAsserted(() -> {
       assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
@@ -534,36 +573,41 @@
   @Test
   public void testServerLocationRemovalThroughPing() throws Exception {
     LATCH.set(new CountDownLatch(2));
+
     int redundantCopies = 3;
     int totalNumberOfBuckets = 4;
 
-    int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
+    int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    createOldClient(100, true, false, true, port0, port1, port2, port3);
 
-    createClient(port0, port1, port2, port3);
-
-    ManagementService managementService = getExistingManagementService(cache);
+    ManagementService managementService = getExistingManagementService(CACHE.get());
     new MemberCrashedListener(LATCH.get()).registerMembershipListener(managementService);
 
     putIntoPartitionedRegions();
-    getFromPartitionedRegions();
+    doGets();
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CACHE.get().getClientMetadataService();
     Map<String, ClientPartitionAdvisor> clientPRMetadata =
         clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+    Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME);
+    Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME);
+    Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME);
+
     await().untilAsserted(() -> {
       assertThat(clientPRMetadata)
           .hasSize(4)
-          .containsKey(testRegion.getFullPath())
+          .containsKey(partitionedRegion.getFullPath())
           .containsKey(customerRegion.getFullPath())
           .containsKey(orderRegion.getFullPath())
           .containsKey(shipmentRegion.getFullPath());
     });
 
-    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
     assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets);
 
     for (Entry entry : prMetadata.getBucketServerLocationsMap_TEST_ONLY().entrySet()) {
@@ -575,43 +619,45 @@
 
     LATCH.get().await(getTimeout().getValueInMS(), MILLISECONDS);
 
-    getFromPartitionedRegions();
+    doGets();
 
     verifyDeadServer(clientPRMetadata, customerRegion, port0, port1);
-    verifyDeadServer(clientPRMetadata, testRegion, port0, port1);
+    verifyDeadServer(clientPRMetadata, partitionedRegion, port0, port1);
   }
 
   @Test
   public void testMetadataFetchOnlyThroughFunctions() {
     // Workaround for 52004
-    addIgnoredException("InternalFunctionInvocationTargetException");
+    addIgnoredException(InternalFunctionInvocationTargetException.class);
+
     int redundantCopies = 3;
     int totalNumberOfBuckets = 4;
 
-    int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
+    int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    createClient(100, true, false, true, port0, port1, port2, port3);
 
-    createClient(port0, port1, port2, port3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
-    executeFunctions();
+    executeFunctions(partitionedRegion);
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     Map<String, ClientPartitionAdvisor> clientPRMetadata =
         clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
     await().untilAsserted(() -> {
-      assertThat(clientPRMetadata).hasSize(1);
+      assertThat(clientPRMetadata)
+          .hasSize(1)
+          .containsKey(partitionedRegion.getFullPath());
     });
 
-    assertThat(clientPRMetadata).containsKey(testRegion.getFullPath());
-
-    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
 
     await().untilAsserted(() -> {
+      clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
       assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets);
-      clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
     });
   }
 
@@ -620,61 +666,63 @@
     int redundantCopies = 3;
     int totalNumberOfBuckets = 4;
 
-    int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
+    int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets));
+    createClient(100, true, false, true, port0, port1, port2, port3);
 
-    createClient(port0, port1, port2, port3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
-    putAll();
+    doPutAlls(partitionedRegion);
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
     Map<String, ClientPartitionAdvisor> clientPRMetadata =
         clientMetadataService.getClientPRMetadata_TEST_ONLY();
+    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
 
     await().untilAsserted(() -> {
-      assertThat(clientPRMetadata).hasSize(1);
-    });
-
-    assertThat(clientPRMetadata).containsKey(testRegion.getFullPath());
-
-    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath());
-
-    await().untilAsserted(() -> {
-      assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets);
+      assertThat(clientPRMetadata)
+          .hasSize(1)
+          .containsKey(partitionedRegion.getFullPath());
+      assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY())
+          .hasSize(totalNumberOfBuckets);
     });
   }
 
   @Test
   public void testMetadataIsSameOnAllServersAndClients() {
+    int locatorPort = DUnitEnv.get().getLocatorPort();
+    String locators = "localhost[" + locatorPort + "]";
+
     int redundantCopies = 3;
     int totalNumberOfBuckets = 4;
 
-    int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
-    int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets));
+    vm0.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets));
+    vm1.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets));
+    vm2.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets));
+    vm3.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets));
+    createClient(100, true, false, false, locatorPort);
 
-    createClient(port0, port1, port2, port3);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
-    put();
+    doManyPuts(partitionedRegion);
 
     for (VM vm : asList(vm0, vm1, vm2, vm3)) {
       vm.invoke(() -> waitForLocalBucketsCreation());
     }
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
+    clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
 
     Map<String, ClientPartitionAdvisor> clientPRMetadata =
         clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
     assertThat(clientPRMetadata)
         .hasSize(1)
-        .containsKey(testRegion.getFullPath());
+        .containsKey(partitionedRegion.getFullPath());
 
-    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
     Map<Integer, List<BucketServerLocation66>> clientBucketMap =
         prMetadata.getBucketServerLocationsMap_TEST_ONLY();
 
@@ -693,25 +741,25 @@
     vm0.invoke(() -> stopServer());
     vm1.invoke(() -> stopServer());
 
-    vm0.invoke(() -> startServerOnPort(port0));
-    vm1.invoke(() -> startServerOnPort(port1));
+    vm0.invoke(() -> startServer());
+    vm1.invoke(() -> startServer());
 
-    put();
+    doManyPuts(partitionedRegion);
 
     for (VM vm : asList(vm0, vm1, vm2, vm3)) {
       vm.invoke(() -> waitForLocalBucketsCreation());
     }
 
-    await().alias("bucket copies are not created").untilAsserted(() -> {
+    await().atMost(2, SECONDS).untilAsserted(() -> {
       Map<String, ClientPartitionAdvisor> clientPRMetadata_await =
           clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
       assertThat(clientPRMetadata_await)
           .hasSize(1)
-          .containsKey(testRegion.getFullPath());
+          .containsKey(partitionedRegion.getFullPath());
 
       ClientPartitionAdvisor prMetadata_await =
-          clientPRMetadata_await.get(testRegion.getFullPath());
+          clientPRMetadata_await.get(partitionedRegion.getFullPath());
       Map<Integer, List<BucketServerLocation66>> clientBucketMap_await =
           prMetadata_await.getBucketServerLocationsMap_TEST_ONLY();
 
@@ -722,20 +770,20 @@
       }
     });
 
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
+    clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
 
     clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
     assertThat(clientPRMetadata)
         .hasSize(1)
-        .containsKey(testRegion.getFullPath());
+        .containsKey(partitionedRegion.getFullPath());
 
-    prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
 
     Map<Integer, List<BucketServerLocation66>> clientBucketMap2 =
         prMetadata.getBucketServerLocationsMap_TEST_ONLY();
 
-    await().alias("expected no metadata to be refreshed").untilAsserted(() -> {
+    await().untilAsserted(() -> {
       assertThat(clientBucketMap2).hasSize(totalNumberOfBuckets);
     });
 
@@ -747,33 +795,34 @@
       vm.invoke(() -> verifyMetadata(clientBucketMap));
     }
 
-    vm0.invoke(() -> cacheRule.closeAndNullCache());
-    vm1.invoke(() -> cacheRule.closeAndNullCache());
+    for (VM vm : asList(vm0, vm1)) {
+      vm.invoke(() -> {
+        SERVER.getAndSet(DUMMY_SERVER).stop();
+      });
+    }
 
-    put();
+    doManyPuts(partitionedRegion);
 
     vm2.invoke(() -> {
-      PartitionedRegion pr = (PartitionedRegion) testRegion;
+      PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
       pr.getRegionAdvisor().getAllClientBucketProfilesTest();
     });
-
     vm3.invoke(() -> {
-      PartitionedRegion pr = (PartitionedRegion) testRegion;
+      PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
       pr.getRegionAdvisor().getAllClientBucketProfilesTest();
     });
 
     vm2.invoke(() -> waitForLocalBucketsCreation());
     vm3.invoke(() -> waitForLocalBucketsCreation());
 
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
-
+    clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
     clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
     assertThat(clientPRMetadata)
         .hasSize(1)
-        .containsKey(testRegion.getFullPath());
+        .containsKey(partitionedRegion.getFullPath());
 
-    prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
     Map<Integer, List<BucketServerLocation66>> clientBucketMap3 =
         prMetadata.getBucketServerLocationsMap_TEST_ONLY();
 
@@ -795,36 +844,35 @@
   public void testMetadataIsSameOnAllServersAndClientsHA() {
     int totalNumberOfBuckets = 4;
 
-    int port0 = vm0.invoke(() -> createServer(2, totalNumberOfBuckets));
-    int port1 = vm1.invoke(() -> createServer(2, totalNumberOfBuckets));
+    int port0 = vm0.invoke(() -> createServer(-1, 2, totalNumberOfBuckets));
+    int port1 = vm1.invoke(() -> createServer(-1, 2, totalNumberOfBuckets));
+    createClient(100, true, false, true, port0, port1, port0, port1);
 
-    createClient(port0, port1, port0, port1);
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
 
-    put();
+    doManyPuts(partitionedRegion);
 
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
-
+    ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService();
+    clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
     Map<String, ClientPartitionAdvisor> clientPRMetadata =
         clientMetadataService.getClientPRMetadata_TEST_ONLY();
 
     await().untilAsserted(() -> {
-      assertThat(clientPRMetadata).hasSize(1);
+      assertThat(clientPRMetadata)
+          .hasSize(1)
+          .containsKey(partitionedRegion.getFullPath());
     });
 
-    assertThat(clientPRMetadata).containsKey(testRegion.getFullPath());
-
     vm0.invoke(() -> {
-      PartitionedRegion pr = (PartitionedRegion) testRegion;
+      PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
       pr.getRegionAdvisor().getAllClientBucketProfilesTest();
     });
-
     vm1.invoke(() -> {
-      PartitionedRegion pr = (PartitionedRegion) testRegion;
+      PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
       pr.getRegionAdvisor().getAllClientBucketProfilesTest();
     });
 
-    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath());
+    ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath());
     Map<Integer, List<BucketServerLocation66>> clientBucketMap =
         prMetadata.getBucketServerLocationsMap_TEST_ONLY();
 
@@ -841,9 +889,9 @@
 
     vm0.invoke(() -> stopServer());
 
-    put();
+    doManyPuts(partitionedRegion);
 
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
+    clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
 
     assertThat(clientBucketMap).hasSize(totalNumberOfBuckets);
 
@@ -853,9 +901,9 @@
 
     assertThat(clientPRMetadata)
         .hasSize(1)
-        .containsKey(testRegion.getFullPath());
-
-    assertThat(clientBucketMap).hasSize(totalNumberOfBuckets);
+        .containsKey(partitionedRegion.getFullPath());
+    assertThat(clientBucketMap)
+        .hasSize(totalNumberOfBuckets);
 
     await().untilAsserted(() -> {
       for (Entry<Integer, List<BucketServerLocation66>> entry : clientBucketMap.entrySet()) {
@@ -868,10 +916,12 @@
   public void testClientMetadataForPersistentPrs() throws Exception {
     LATCH.set(new CountDownLatch(4));
 
-    int port0 = vm0.invoke(() -> createPersistentPrsAndServer());
-    int port1 = vm1.invoke(() -> createPersistentPrsAndServer());
-    int port2 = vm2.invoke(() -> createPersistentPrsAndServer());
-    int port3 = vm3.invoke(() -> createPersistentPrsAndServer());
+    int locatorPort = DUnitEnv.get().getLocatorPort();
+
+    vm0.invoke(() -> createServer("disk", -1, 3, 4));
+    vm1.invoke(() -> createServer("disk", -1, 3, 4));
+    vm2.invoke(() -> createServer("disk", -1, 3, 4));
+    vm3.invoke(() -> createServer("disk", -1, 3, 4));
 
     vm3.invoke(() -> putIntoPartitionedRegions());
 
@@ -879,108 +929,499 @@
       vm.invoke(() -> waitForLocalBucketsCreation());
     }
 
-    createClient(port0, port1, port2, port3);
+    createOldClient(100, true, false, false, locatorPort);
 
-    ManagementService managementService = getExistingManagementService(cache);
-    MemberCrashedListener listener = new MemberCrashedListener(LATCH.get());
-    listener.registerMembershipListener(managementService);
+    ManagementService managementService = getExistingManagementService(CACHE.get());
+    new MemberCrashedListener(LATCH.get()).registerMembershipListener(managementService);
+    ClientMetadataService clientMetadataService = CACHE.get().getClientMetadataService();
 
-    await().until(() -> fetchAndValidateMetadata());
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+
+    await().until(() -> {
+      clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
+
+      Map<ServerLocation, Set<Integer>> serverBucketMap =
+          clientMetadataService.groupByServerToAllBuckets(partitionedRegion, true);
+
+      return serverBucketMap != null;
+    });
 
     for (VM vm : asList(vm0, vm1, vm2, vm3)) {
-      vm.invoke(() -> cacheRule.closeAndNullCache());
+      vm.invoke(() -> {
+        SERVER.getAndSet(DUMMY_SERVER).stop();
+      });
     }
 
     LATCH.get().await(getTimeout().getValueInMS(), MILLISECONDS);
 
-    AsyncInvocation<Void> createServerOnVM3 =
-        vm3.invokeAsync(() -> createPersistentPrsAndServerOnPort(port3));
-    AsyncInvocation<Void> createServerOnVM2 =
-        vm2.invokeAsync(() -> createPersistentPrsAndServerOnPort(port2));
-    AsyncInvocation<Void> createServerOnVM1 =
-        vm1.invokeAsync(() -> createPersistentPrsAndServerOnPort(port1));
-    AsyncInvocation<Void> createServerOnVM0 =
-        vm0.invokeAsync(() -> createPersistentPrsAndServerOnPort(port0));
+    AsyncInvocation<Integer> createServerOnVM3 =
+        vm3.invokeAsync(() -> createServer("disk", -1, 3, 4));
+    AsyncInvocation<Integer> createServerOnVM2 =
+        vm2.invokeAsync(() -> createServer("disk", -1, 3, 4));
+    AsyncInvocation<Integer> createServerOnVM1 =
+        vm1.invokeAsync(() -> createServer("disk", -1, 3, 4));
+    AsyncInvocation<Integer> createServerOnVM0 =
+        vm0.invokeAsync(() -> createServer("disk", -1, 3, 4));
 
     createServerOnVM3.await();
     createServerOnVM2.await();
     createServerOnVM1.await();
     createServerOnVM0.await();
 
-    fetchAndValidateMetadata();
+    await().untilAsserted(() -> {
+      clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion);
+      Map<ServerLocation, Set<Integer>> serverBucketMap =
+          clientMetadataService.groupByServerToAllBuckets(partitionedRegion, true);
+
+      assertThat(serverBucketMap).hasSize(4);
+    });
   }
 
-  private boolean fetchAndValidateMetadata() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    clientMetadataService.getClientPRMetadata((InternalRegion) testRegion);
-
-    Map<ServerLocation, Set<Integer>> serverBucketMap =
-        clientMetadataService.groupByServerToAllBuckets(testRegion, true);
-
-    return serverBucketMap != null;
+  private int createServer(int localMaxMemory, int redundantCopies, int totalNumberOfBuckets)
+      throws IOException {
+    return createServer(null, null, localMaxMemory, redundantCopies, totalNumberOfBuckets);
   }
 
-  private void stopServer() {
-    for (CacheServer cacheServer : cache.getCacheServers()) {
-      cacheServer.stop();
+  private int createServer(String diskStoreName, int localMaxMemory, int redundantCopies,
+      int totalNumberOfBuckets) throws IOException {
+    return createServer(null, diskStoreName, localMaxMemory, redundantCopies, totalNumberOfBuckets);
+  }
+
+  private int createServer(String locators, String diskStoreName, int localMaxMemory,
+      int redundantCopies, int totalNumberOfBuckets) throws IOException {
+    return doCreateServer(locators, 0, diskStoreName, localMaxMemory, LOCAL_MAX_MEMORY_DEFAULT,
+        redundantCopies, totalNumberOfBuckets);
+  }
+
+  private int createAccessorServer(int redundantCopies, int totalNumberOfBuckets)
+      throws IOException {
+    return doCreateServer(null, 0, null, 0, 0, redundantCopies, totalNumberOfBuckets);
+  }
+
+  private void createAccessorPeer(int redundantCopies, int totalNumberOfBuckets)
+      throws IOException {
+    ServerLauncher serverLauncher = new ServerLauncher.Builder()
+        .setDeletePidFileOnStop(true)
+        .setDisableDefaultServer(true)
+        .setWorkingDirectory(getWorkingDirectory())
+        .set(getDistributedSystemProperties())
+        .build();
+    serverLauncher.start();
+
+    SERVER.set(serverLauncher);
+
+    createRegions(null, -1, -1, redundantCopies, totalNumberOfBuckets);
+  }
+
+  private int doCreateServer(String locators, int serverPortInput, String diskStoreName,
+      int localMaxMemory, int localMaxMemoryOthers, int redundantCopies, int totalNumberOfBuckets)
+      throws IOException {
+    ServerLauncher.Builder serverBuilder = new ServerLauncher.Builder()
+        .setDeletePidFileOnStop(true)
+        .setDisableDefaultServer(true)
+        .setWorkingDirectory(getWorkingDirectory())
+        .set(getDistributedSystemProperties());
+
+    if (locators != null) {
+      serverBuilder.set(LOCATORS, locators);
     }
+
+    ServerLauncher serverLauncher = serverBuilder.build();
+    serverLauncher.start();
+
+    SERVER.set(serverLauncher);
+
+    CacheServer cacheServer = serverLauncher.getCache().addCacheServer();
+    cacheServer.setHostnameForClients("localhost");
+    cacheServer.setPort(serverPortInput);
+    cacheServer.start();
+
+    int serverPort = cacheServer.getPort();
+
+    createRegions(diskStoreName, localMaxMemory, localMaxMemoryOthers, redundantCopies,
+        totalNumberOfBuckets);
+
+    return serverPort;
   }
 
-  private void startLocatorInVM(int locatorPort) throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+  private void createRegions(String diskStoreName, int localMaxMemory, int localMaxMemoryOthers,
+      int redundantCopies, int totalNumberOfBuckets) throws IOException {
+    createPartitionedRegion(PARTITIONED_REGION_NAME, null, diskStoreName, localMaxMemory, null,
+        redundantCopies, totalNumberOfBuckets);
 
-    File logFile = new File("locator-" + locatorPort + ".log");
+    createPartitionedRegion(CUSTOMER_REGION_NAME, null, diskStoreName, localMaxMemoryOthers,
+        new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets);
 
-    locator = Locator.startLocatorAndDS(locatorPort, logFile, null, properties);
+    createPartitionedRegion(ORDER_REGION_NAME, CUSTOMER_REGION_NAME, diskStoreName,
+        localMaxMemoryOthers,
+        new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets);
+
+    createPartitionedRegion(SHIPMENT_REGION_NAME, ORDER_REGION_NAME, diskStoreName,
+        localMaxMemoryOthers,
+        new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets);
+
+    SERVER.get().getCache().createRegionFactory().create(REPLICATE_REGION_NAME);
   }
 
-  private void stopLocator() {
-    locator.stop();
+  private void createClient(int pingInterval, boolean prSingleHopEnabled,
+      boolean subscriptionEnabled, boolean useServerPool, int... ports) {
+    CLIENT.set((InternalClientCache) new ClientCacheFactory().set(LOCATORS, "").create());
+    String poolName =
+        createPool(pingInterval, prSingleHopEnabled, subscriptionEnabled, useServerPool, ports);
+    createRegionsInClientCache(poolName);
   }
 
-  private int createServerWithLocator(String locators) throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, locators);
+  private void createOldClient(int pingInterval, boolean prSingleHopEnabled,
+      boolean subscriptionEnabled, boolean useServerPool, int... ports) {
+    CACHE.set((InternalCache) new CacheFactory().set(LOCATORS, "").create());
+    String poolName =
+        createPool(pingInterval, prSingleHopEnabled, subscriptionEnabled, useServerPool, ports);
+    createRegionsInOldClient(poolName);
+  }
 
-    cache = cacheRule.getOrCreateCache(properties);
+  private String createPool(long pingInterval, boolean prSingleHopEnabled,
+      boolean subscriptionEnabled, boolean useServerPool, int... ports) {
+    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
+    String poolName = PARTITIONED_REGION_NAME;
+    try {
+      PoolFactory poolFactory = PoolManager.createFactory()
+          .setMaxConnections(10)
+          .setMinConnections(6)
+          .setPingInterval(pingInterval)
+          .setPRSingleHopEnabled(prSingleHopEnabled)
+          .setReadTimeout(2000)
+          .setRetryAttempts(3)
+          .setSocketBufferSize(1000)
+          .setSubscriptionEnabled(subscriptionEnabled)
+          .setSubscriptionRedundancy(-1);
 
-    CacheServer cacheServer = cache.addCacheServer();
+      if (useServerPool) {
+        for (int port : ports) {
+          poolFactory.addServer("localhost", port);
+        }
+      } else {
+        for (int port : ports) {
+          poolFactory.addLocator("localhost", port);
+        }
+      }
+
+      poolFactory.create(poolName);
+    } finally {
+      System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints");
+    }
+
+    return poolName;
+  }
+
+  private <K, V> Region<K, V> createPartitionedRegion(String regionName, String colocatedRegionName,
+      String diskStoreName, int localMaxMemory, PartitionResolver<K, V> partitionResolver,
+      int redundantCopies, int totalNumberOfBuckets) throws IOException {
+    InternalCache cache = getCache();
+
+    PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>()
+        .setRedundantCopies(redundantCopies)
+        .setTotalNumBuckets(totalNumberOfBuckets);
+
+    if (colocatedRegionName != null) {
+      paf.setColocatedWith(colocatedRegionName);
+    }
+    if (localMaxMemory > -1) {
+      paf.setLocalMaxMemory(localMaxMemory);
+    }
+    if (partitionResolver != null) {
+      paf.setPartitionResolver(partitionResolver);
+    }
+
+    RegionFactory<K, V> regionFactory;
+    if (diskStoreName != null) {
+      // create DiskStore
+      if (cache.findDiskStore(diskStoreName) == null) {
+        cache.createDiskStoreFactory()
+            .setDiskDirs(getDiskDirs())
+            .create(diskStoreName);
+      }
+
+      regionFactory = cache.createRegionFactory(PARTITION_PERSISTENT);
+      regionFactory.setDiskStoreName(diskStoreName);
+    } else {
+      regionFactory = cache.createRegionFactory(PARTITION);
+    }
+
+    return regionFactory
+        .setConcurrencyChecksEnabled(true)
+        .setPartitionAttributes(paf.create())
+        .create(regionName);
+  }
+
+  private void createRegionsInClientCache(String poolName) {
+    ClientRegionFactory<Object, Object> proxyRegionFactory =
+        CLIENT.get().createClientRegionFactory(ClientRegionShortcut.PROXY);
+    proxyRegionFactory.setPoolName(poolName);
+
+    proxyRegionFactory.create(PARTITIONED_REGION_NAME);
+
+    ClientRegionFactory<Object, Object> localRegionFactory =
+        CLIENT.get().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    localRegionFactory.setConcurrencyChecksEnabled(true);
+    localRegionFactory.setPoolName(poolName);
+
+    localRegionFactory.create(CUSTOMER_REGION_NAME);
+    localRegionFactory.create(ORDER_REGION_NAME);
+    localRegionFactory.create(SHIPMENT_REGION_NAME);
+    localRegionFactory.create(REPLICATE_REGION_NAME);
+  }
+
+  private void createRegionsInOldClient(String poolName) {
+    CACHE.get().createRegionFactory()
+        .setDataPolicy(DataPolicy.EMPTY)
+        .setPoolName(poolName)
+        .create(PARTITIONED_REGION_NAME);
+
+    RegionFactory<Object, Object> localRegionFactory = CACHE.get().createRegionFactory(LOCAL)
+        .setConcurrencyChecksEnabled(true)
+        .setPoolName(poolName);
+
+    localRegionFactory.create(CUSTOMER_REGION_NAME);
+    localRegionFactory.create(ORDER_REGION_NAME);
+    localRegionFactory.create(SHIPMENT_REGION_NAME);
+    localRegionFactory.create(REPLICATE_REGION_NAME);
+  }
+
+  private int startServer() throws IOException {
+    CacheServer cacheServer = SERVER.get().getCache().addCacheServer();
     cacheServer.setHostnameForClients("localhost");
     cacheServer.setPort(0);
     cacheServer.start();
 
-    int redundantCopies = 0;
-    int totalNumberOfBuckets = 8;
-
-    testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets,
-        LOCAL_MAX_MEMORY_DEFAULT);
-
-    customerRegion = createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets,
-        LOCAL_MAX_MEMORY_DEFAULT);
-
-    orderRegion = createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets,
-        LOCAL_MAX_MEMORY_DEFAULT);
-
-    shipmentRegion = createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets,
-        LOCAL_MAX_MEMORY_DEFAULT);
-
     return cacheServer.getPort();
   }
 
+  private int startLocator() throws IOException {
+    LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+        .setDeletePidFileOnStop(true)
+        .setPort(0)
+        .setWorkingDirectory(getWorkingDirectory())
+        .set(ENABLE_CLUSTER_CONFIGURATION, "false")
+        .build();
+    locatorLauncher.start();
+
+    LOCATOR.set(locatorLauncher);
+
+    return locatorLauncher.getLocator().getPort();
+  }
+
+  private void stopServer() {
+    for (CacheServer cacheServer : SERVER.get().getCache().getCacheServers()) {
+      cacheServer.stop();
+    }
+  }
+
+  private void doPuts(Region<Object, Object> region) {
+    region.put(0, "create0");
+    region.put(1, "create1");
+    region.put(2, "create2");
+    region.put(3, "create3");
+  }
+
+  private void doManyPuts(Region<Object, Object> region) {
+    region.put(0, "create0");
+    region.put(1, "create1");
+    region.put(2, "create2");
+    region.put(3, "create3");
+    for (int i = 0; i < 40; i++) {
+      region.put(i, "create" + i);
+    }
+  }
+
+  private void putIntoPartitionedRegions() {
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+    Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME);
+    Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME);
+    Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME);
+    Region<Object, Object> replicateRegion = getRegion(REPLICATE_REGION_NAME);
+
+    for (int i = 0; i <= 3; i++) {
+      CustomerId customerId = new CustomerId(i);
+      Customer customer = new Customer("name" + i, "Address" + i);
+      customerRegion.put(customerId, customer);
+
+      for (int j = 1; j <= 10; j++) {
+        int oid = i * 10 + j;
+        OrderId orderId = new OrderId(oid, customerId);
+        Order order = new Order("Order" + oid);
+        orderRegion.put(orderId, order);
+
+        for (int k = 1; k <= 10; k++) {
+          int sid = oid * 10 + k;
+          ShipmentId shipmentId = new ShipmentId(sid, orderId);
+          Shipment shipment = new Shipment("Shipment" + sid);
+          shipmentRegion.put(shipmentId, shipment);
+        }
+      }
+    }
+
+    partitionedRegion.put(0, "create0");
+    partitionedRegion.put(1, "create1");
+    partitionedRegion.put(2, "create2");
+    partitionedRegion.put(3, "create3");
+
+    partitionedRegion.put(0, "update0");
+    partitionedRegion.put(1, "update1");
+    partitionedRegion.put(2, "update2");
+    partitionedRegion.put(3, "update3");
+
+    partitionedRegion.put(0, "update00");
+    partitionedRegion.put(1, "update11");
+    partitionedRegion.put(2, "update22");
+    partitionedRegion.put(3, "update33");
+
+    Map<Object, Object> map = new HashMap<>();
+    map.put(1, 1);
+    replicateRegion.putAll(map);
+  }
+
+  private void doPutAlls(Region<Object, Object> region) {
+    Map<Object, Object> map = new HashMap<>();
+    map.put(0, 0);
+    map.put(1, 1);
+    map.put(2, 2);
+    map.put(3, 3);
+
+    region.putAll(map, "putAllCallback");
+    region.putAll(map);
+    region.putAll(map);
+    region.putAll(map);
+  }
+
+  private void doGets() {
+    Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME);
+    Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME);
+    Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME);
+    Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME);
+
+    for (int i = 0; i <= 3; i++) {
+      CustomerId customerId = new CustomerId(i);
+      Customer customer = new Customer("name" + i, "Address" + i);
+      customerRegion.get(customerId, customer);
+
+      for (int j = 1; j <= 10; j++) {
+        int oid = i * 10 + j;
+        OrderId orderId = new OrderId(oid, customerId);
+        Order order = new Order("Order" + oid);
+        orderRegion.get(orderId, order);
+
+        for (int k = 1; k <= 10; k++) {
+          int sid = oid * 10 + k;
+          ShipmentId shipmentId = new ShipmentId(sid, orderId);
+          Shipment shipment = new Shipment("Shipment" + sid);
+          shipmentRegion.get(shipmentId, shipment);
+        }
+      }
+    }
+
+    partitionedRegion.get(0, "create0");
+    partitionedRegion.get(1, "create1");
+    partitionedRegion.get(2, "create2");
+    partitionedRegion.get(3, "create3");
+
+    partitionedRegion.get(0, "update0");
+    partitionedRegion.get(1, "update1");
+    partitionedRegion.get(2, "update2");
+    partitionedRegion.get(3, "update3");
+
+    partitionedRegion.get(0, "update00");
+    partitionedRegion.get(1, "update11");
+    partitionedRegion.get(2, "update22");
+    partitionedRegion.get(3, "update33");
+  }
+
+  private void executeFunctions(Region<Object, Object> region) {
+    cast(FunctionService.onRegion(region))
+        .withFilter(filter(0))
+        .execute(new PutFunction())
+        .getResult();
+
+    cast(FunctionService.onRegion(region))
+        .withFilter(filter(0, 1))
+        .execute(new PutFunction())
+        .getResult();
+
+    cast(FunctionService.onRegion(region))
+        .withFilter(filter(0, 1, 2, 3))
+        .execute(new PutFunction())
+        .getResult();
+
+    cast(FunctionService.onRegion(region))
+        .execute(new PutFunction())
+        .getResult();
+  }
+
+  private Set<Object> filter(Object... values) {
+    return Arrays.stream(values).collect(Collectors.toSet());
+  }
+
   private void clearMetadata() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
+    ClientMetadataService clientMetadataService =
+        getInternalCache(SERVER.get()).getClientMetadataService();
     clientMetadataService.getClientPartitionAttributesMap().clear();
     clientMetadataService.getClientPRMetadata_TEST_ONLY().clear();
   }
 
+  private String getWorkingDirectory() throws IOException {
+    int vmId = getVMId();
+    File directory = new File(temporaryFolder.getRoot(), "VM-" + vmId);
+    if (!directory.exists()) {
+      temporaryFolder.newFolder("VM-" + vmId);
+    }
+    return directory.getAbsolutePath();
+  }
+
+  private File getDiskDir() throws IOException {
+    File file = new File(temporaryFolder.getRoot(), diskStoreName + getVMId());
+    if (!file.exists()) {
+      temporaryFolder.newFolder(diskStoreName + getVMId());
+    }
+    return file.getAbsoluteFile();
+  }
+
+  private File[] getDiskDirs() throws IOException {
+    return new File[] {getDiskDir()};
+  }
+
+  private InternalCache getInternalCache(ServerLauncher serverLauncher) {
+    return cast(serverLauncher.getCache());
+  }
+
+  private void waitForLocalBucketsCreation() {
+    PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
+
+    await().untilAsserted(() -> assertThat(pr.getDataStore().getAllLocalBuckets()).hasSize(4));
+  }
+
+  private void verifyDeadServer(Map<String, ClientPartitionAdvisor> regionMetaData, Region region,
+      int port0, int port1) {
+    ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath());
+    Set<Entry<Integer, List<BucketServerLocation66>>> bucketLocationsMap =
+        prMetaData.getBucketServerLocationsMap_TEST_ONLY().entrySet();
+
+    for (Entry<Integer, List<BucketServerLocation66>> entry : bucketLocationsMap) {
+      for (BucketServerLocation66 bucketLocation : entry.getValue()) {
+        assertThat(bucketLocation.getPort())
+            .isNotEqualTo(port0)
+            .isNotEqualTo(port1);
+      }
+    }
+  }
+
   private void verifyMetadata(Map<Integer, List<BucketServerLocation66>> clientBucketMap) {
-    PartitionedRegion pr = (PartitionedRegion) testRegion;
+    PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME);
     Map<Integer, Set<ServerBucketProfile>> serverBucketMap =
         pr.getRegionAdvisor().getAllClientBucketProfilesTest();
 
-    assertThat(serverBucketMap).hasSize(clientBucketMap.size());
-    assertThat(clientBucketMap.keySet()).containsAll(serverBucketMap.keySet());
+    assertThat(serverBucketMap)
+        .hasSize(clientBucketMap.size());
+    assertThat(serverBucketMap.keySet())
+        .containsAll(clientBucketMap.keySet());
 
     for (Entry<Integer, List<BucketServerLocation66>> entry : clientBucketMap.entrySet()) {
       int bucketId = entry.getKey();
@@ -998,630 +1439,47 @@
       assertThat(countOfPrimaries).isEqualTo(1);
 
       Set<ServerBucketProfile> bucketProfiles = serverBucketMap.get(bucketId);
+
       assertThat(bucketProfiles).hasSize(bucketLocations.size());
 
       countOfPrimaries = 0;
       for (ServerBucketProfile bucketProfile : bucketProfiles) {
-        ServerLocation sl = (ServerLocation) bucketProfile.getBucketServerLocations().toArray()[0];
-        assertThat(bucketLocations.contains(sl)).isTrue();
-        // should be only one primary
-        if (bucketProfile.isPrimary) {
-          countOfPrimaries++;
-          assertThat(sl).isEqualTo(primaryBucketLocation);
+        for (BucketServerLocation66 bucketLocation : bucketProfile.getBucketServerLocations()) {
+
+          assertThat(bucketLocations).contains(bucketLocation);
+
+          // should be only one primary
+          if (bucketProfile.isPrimary) {
+            countOfPrimaries++;
+
+            assertThat(bucketLocation).isEqualTo(primaryBucketLocation);
+          }
         }
       }
+
       assertThat(countOfPrimaries).isEqualTo(1);
     }
   }
 
-  private void waitForLocalBucketsCreation() {
-    PartitionedRegion pr = (PartitionedRegion) testRegion;
-
-    await().untilAsserted(() -> assertThat(pr.getDataStore().getAllLocalBuckets()).hasSize(4));
-  }
-
-  private void verifyDeadServer(Map<String, ClientPartitionAdvisor> regionMetaData, Region region,
-      int port0, int port1) {
-
-    ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath());
-
-    for (Entry<Integer, List<BucketServerLocation66>> entry : prMetaData
-        .getBucketServerLocationsMap_TEST_ONLY().entrySet()) {
-      for (BucketServerLocation66 bsl : entry.getValue()) {
-        assertThat(bsl.getPort())
-            .isNotEqualTo(port0)
-            .isNotEqualTo(port1);
-      }
+  private InternalCache getCache() {
+    if (CACHE.get() != DUMMY_CACHE) {
+      return CACHE.get();
     }
-  }
-
-  private void createClientWithoutPRSingleHopEnabled(int port0) {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, "");
-
-    cache = cacheRule.getOrCreateCache(properties);
-
-    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
-
-    Pool pool;
-    try {
-      pool = PoolManager.createFactory()
-          .addServer("localhost", port0)
-          .setPingInterval(250)
-          .setSubscriptionEnabled(true)
-          .setSubscriptionRedundancy(-1)
-          .setReadTimeout(2000)
-          .setSocketBufferSize(1000)
-          .setMinConnections(6)
-          .setMaxConnections(10)
-          .setRetryAttempts(3)
-          .setPRSingleHopEnabled(false)
-          .create(PR_NAME);
-    } finally {
-      System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "false");
+    if (SERVER.get() != DUMMY_SERVER) {
+      return (InternalCache) SERVER.get().getCache();
     }
-
-    createRegionsInClientCache(pool.getName());
-  }
-
-  private int createAccessorServer() throws IOException {
-    cache = cacheRule.getOrCreateCache();
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.setHostnameForClients("localhost");
-    cacheServer.start();
-
-    int redundantCopies = 1;
-    int totalNumberOfBuckets = 4;
-    int localMaxMemory = 0;
-
-    testRegion =
-        createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, localMaxMemory);
-
-    customerRegion =
-        createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets,
-            localMaxMemory);
-
-    orderRegion =
-        createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets,
-            localMaxMemory);
-
-    shipmentRegion =
-        createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets,
-            localMaxMemory);
-
-    RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
-    replicatedRegion = regionFactory.create("rr");
-
-    return cacheServer.getPort();
-  }
-
-  private <K, V> Region<K, V> createBasicPartitionedRegion(int redundantCopies,
-      int totalNumberOfBuckets,
-      int localMaxMemory) {
-    PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>()
-        .setRedundantCopies(redundantCopies)
-        .setTotalNumBuckets(totalNumberOfBuckets);
-
-    if (localMaxMemory > -1) {
-      paf.setLocalMaxMemory(localMaxMemory);
+    if (CLIENT.get() != DUMMY_CACHE) {
+      return (InternalCache) CLIENT.get();
     }
-
-    return cache.<K, V>createRegionFactory()
-        .setPartitionAttributes(paf.create())
-        .setConcurrencyChecksEnabled(true)
-        .create(PR_NAME);
+    return null;
   }
 
-  private <K, V> Region<K, V> createColocatedRegion(String regionName, String colocatedRegionName,
-      int redundantCopies, int totalNumberOfBuckets, int localMaxMemory) {
-    PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>()
-        .setRedundantCopies(redundantCopies)
-        .setTotalNumBuckets(totalNumberOfBuckets)
-        .setPartitionResolver(new CustomerIdPartitionResolver<>("CustomerIDPartitionResolver"));
-
-    if (colocatedRegionName != null) {
-      paf.setColocatedWith(colocatedRegionName);
+  private Region<Object, Object> getRegion(String name) {
+    InternalCache cache = getCache();
+    if (cache != null) {
+      return cache.getRegion(name);
     }
-    if (localMaxMemory > -1) {
-      paf.setLocalMaxMemory(localMaxMemory);
-    }
-
-    return cache.<K, V>createRegionFactory()
-        .setPartitionAttributes(paf.create())
-        .setConcurrencyChecksEnabled(true)
-        .create(regionName);
-  }
-
-  private int createServer(int redundantCopies, int totalNumberOfBuckets) throws IOException {
-    cache = cacheRule.getOrCreateCache();
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.setHostnameForClients("localhost");
-    cacheServer.start();
-
-    testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, -1);
-
-    // creating colocated Regions
-    customerRegion =
-        createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets,
-            LOCAL_MAX_MEMORY_DEFAULT);
-
-    orderRegion =
-        createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets,
-            LOCAL_MAX_MEMORY_DEFAULT);
-
-    shipmentRegion =
-        createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets,
-            LOCAL_MAX_MEMORY_DEFAULT);
-
-    replicatedRegion = cache.createRegionFactory().create("rr");
-
-    return cacheServer.getPort();
-  }
-
-  private int createPersistentPrsAndServer() throws IOException {
-    cache = cacheRule.getOrCreateCache();
-
-    if (cache.findDiskStore(diskStoreName) == null) {
-      cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
-    }
-
-    testRegion = createBasicPersistentPartitionRegion();
-
-    // creating colocated Regions
-
-    int redundantCopies = 3;
-    int totalNumberOfBuckets = 4;
-
-    customerRegion = createColocatedPersistentRegionForTest(CUSTOMER, null,
-        redundantCopies, totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    orderRegion = createColocatedPersistentRegionForTest(ORDER, CUSTOMER, redundantCopies,
-        totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    shipmentRegion = createColocatedPersistentRegionForTest(SHIPMENT, ORDER, redundantCopies,
-        totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
-
-    replicatedRegion = regionFactory.create("rr");
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.setHostnameForClients("localhost");
-    cacheServer.start();
-
-    return cacheServer.getPort();
-  }
-
-  private <K, V> Region<K, V> createBasicPersistentPartitionRegion() {
-    PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>()
-        .setRedundantCopies(3)
-        .setTotalNumBuckets(4);
-
-    return cache.<K, V>createRegionFactory()
-        .setDataPolicy(DataPolicy.PERSISTENT_PARTITION)
-        .setDiskStoreName("disk")
-        .setPartitionAttributes(paf.create())
-        .create(PR_NAME);
-  }
-
-  private <K, V> Region<K, V> createColocatedPersistentRegionForTest(String regionName,
-      String colocatedRegionName, int redundantCopies, int totalNumberOfBuckets,
-      int localMaxMemory) {
-
-    PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>()
-        .setRedundantCopies(redundantCopies)
-        .setTotalNumBuckets(totalNumberOfBuckets)
-        .setPartitionResolver(new CustomerIdPartitionResolver<>("CustomerIDPartitionResolver"));
-
-    if (localMaxMemory > -1) {
-      paf.setLocalMaxMemory(localMaxMemory);
-    }
-    if (colocatedRegionName != null) {
-      paf.setColocatedWith(colocatedRegionName);
-    }
-
-    RegionFactory<K, V> regionFactory = cache.<K, V>createRegionFactory()
-        .setDataPolicy(DataPolicy.PERSISTENT_PARTITION)
-        .setDiskStoreName("disk")
-        .setPartitionAttributes(paf.create());
-
-    return regionFactory.create(regionName);
-  }
-
-  private void createPersistentPrsAndServerOnPort(int port) throws IOException {
-    cache = cacheRule.getOrCreateCache();
-
-    if (cache.findDiskStore(diskStoreName) == null) {
-      cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
-    }
-
-    testRegion = createBasicPersistentPartitionRegion();
-
-    // creating colocated Regions
-    int redundantCopies = 3;
-    int totalNumberOfBuckets = 4;
-
-    customerRegion =
-        createColocatedPersistentRegionForTest(CUSTOMER, null, redundantCopies,
-            totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    orderRegion =
-        createColocatedPersistentRegionForTest(ORDER, CUSTOMER, redundantCopies,
-            totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    shipmentRegion =
-        createColocatedPersistentRegionForTest(SHIPMENT, ORDER, redundantCopies,
-            totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT);
-
-    replicatedRegion = cache.createRegionFactory()
-        .create("rr");
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(port);
-    cacheServer.setHostnameForClients("localhost");
-    cacheServer.start();
-  }
-
-  private void startServerOnPort(int port) throws IOException {
-    cache = cacheRule.getOrCreateCache();
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(port);
-    cacheServer.setHostnameForClients("localhost");
-    cacheServer.start();
-  }
-
-  private void createPeer() {
-    cache = cacheRule.getOrCreateCache();
-
-    int redundantCopies = 1;
-    int totalNumberOfBuckets = 4;
-
-    testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, -1);
-
-    customerRegion =
-        createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets, -1);
-
-    orderRegion =
-        createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets, -1);
-
-    shipmentRegion =
-        createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets, -1);
-
-    replicatedRegion = cache.createRegionFactory().create("rr");
-  }
-
-  private void createClient(int port) {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, "");
-
-    cache = cacheRule.getOrCreateCache(properties);
-
-    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
-    Pool pool;
-    try {
-      pool = PoolManager.createFactory()
-          .addServer("localhost", port)
-          .setPingInterval(250)
-          .setSubscriptionEnabled(true)
-          .setSubscriptionRedundancy(-1)
-          .setReadTimeout(2000)
-          .setSocketBufferSize(1000)
-          .setMinConnections(6)
-          .setMaxConnections(10)
-          .setRetryAttempts(3)
-          .create(PR_NAME);
-    } finally {
-      System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "false");
-    }
-
-    createRegionsInClientCache(pool.getName());
-  }
-
-  private void createClient(int port0, int port1) {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, "");
-
-    cache = cacheRule.getOrCreateCache(properties);
-
-    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
-    Pool pool;
-    try {
-      pool = PoolManager.createFactory()
-          .addServer("localhost", port0)
-          .addServer("localhost", port1)
-          .setPingInterval(250)
-          .setSubscriptionEnabled(true)
-          .setSubscriptionRedundancy(-1)
-          .setReadTimeout(2000)
-          .setSocketBufferSize(1000)
-          .setMinConnections(6)
-          .setMaxConnections(10)
-          .setRetryAttempts(3)
-          .create(PR_NAME);
-    } finally {
-      System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints");
-    }
-
-    createRegionsInClientCache(pool.getName());
-  }
-
-  private void createClientWithLocator(String host, int port0) {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, "");
-
-    cache = cacheRule.getOrCreateCache(properties);
-
-    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
-    Pool pool;
-    try {
-      pool = PoolManager.createFactory()
-          .addLocator(host, port0)
-          .setPingInterval(250)
-          .setSubscriptionEnabled(true)
-          .setSubscriptionRedundancy(-1)
-          .setReadTimeout(2000)
-          .setSocketBufferSize(1000)
-          .setMinConnections(6)
-          .setMaxConnections(10)
-          .setRetryAttempts(3)
-          .create(PR_NAME);
-    } finally {
-      System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints");
-    }
-
-    createRegionsInClientCache(pool.getName());
-  }
-
-  private void createClient(int port0, int port1, int port2, int port3) {
-    Properties properties = new Properties();
-    properties.setProperty(LOCATORS, "");
-
-    cache = cacheRule.getOrCreateCache(properties);
-
-    System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
-    Pool pool;
-    try {
-      pool = PoolManager.createFactory()
-          .addServer("localhost", port0)
-          .addServer("localhost", port1)
-          .addServer("localhost", port2)
-          .addServer("localhost", port3)
-          .setPingInterval(100)
-          .setSubscriptionEnabled(false)
-          .setReadTimeout(2000)
-          .setSocketBufferSize(1000)
-          .setMinConnections(6)
-          .setMaxConnections(10)
-          .setRetryAttempts(3)
-          .create(PR_NAME);
-    } finally {
-      System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints");
-    }
-
-    createRegionsInClientCache(pool.getName());
-  }
-
-  private void createRegionsInClientCache(String poolName) {
-    testRegion = cache.createRegionFactory()
-        .setDataPolicy(DataPolicy.EMPTY)
-        .setPoolName(poolName)
-        .create(PR_NAME);
-
-    customerRegion = cache.createRegionFactory()
-        .setConcurrencyChecksEnabled(true)
-        .setPoolName(poolName)
-        .setScope(Scope.LOCAL)
-        .create(CUSTOMER);
-
-    orderRegion = cache.createRegionFactory()
-        .setConcurrencyChecksEnabled(true)
-        .setPoolName(poolName)
-        .setScope(Scope.LOCAL)
-        .create(ORDER);
-
-    shipmentRegion = cache.createRegionFactory()
-        .setConcurrencyChecksEnabled(true)
-        .setPoolName(poolName)
-        .setScope(Scope.LOCAL)
-        .create(SHIPMENT);
-
-    replicatedRegion = cache.createRegionFactory()
-        .setConcurrencyChecksEnabled(true)
-        .setPoolName(poolName)
-        .setScope(Scope.LOCAL)
-        .create("rr");
-  }
-
-  private void putIntoPartitionedRegions() {
-    for (int i = 0; i <= 3; i++) {
-      CustomerId custid = new CustomerId(i);
-      Customer customer = new Customer("name" + i, "Address" + i);
-      customerRegion.put(custid, customer);
-
-      for (int j = 1; j <= 10; j++) {
-        int oid = i * 10 + j;
-        OrderId orderId = new OrderId(oid, custid);
-        Order order = new Order("Order" + oid);
-        orderRegion.put(orderId, order);
-
-        for (int k = 1; k <= 10; k++) {
-          int sid = oid * 10 + k;
-          ShipmentId shipmentId = new ShipmentId(sid, orderId);
-          Shipment shipment = new Shipment("Shipment" + sid);
-          shipmentRegion.put(shipmentId, shipment);
-        }
-      }
-    }
-
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
-
-    testRegion.put(0, "update0");
-    testRegion.put(1, "update1");
-    testRegion.put(2, "update2");
-    testRegion.put(3, "update3");
-
-    testRegion.put(0, "update00");
-    testRegion.put(1, "update11");
-    testRegion.put(2, "update22");
-    testRegion.put(3, "update33");
-
-    Map<Object, Object> map = new HashMap<>();
-    map.put(1, 1);
-    replicatedRegion.putAll(map);
-  }
-
-  private File getDiskDir() {
-    try {
-      File file = new File(temporaryFolder.getRoot(), diskStoreName + getVMId());
-      if (!file.exists()) {
-        temporaryFolder.newFolder(diskStoreName + getVMId());
-      }
-      return file.getAbsoluteFile();
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private File[] getDiskDirs() {
-    return new File[] {getDiskDir()};
-  }
-
-  private void executeFunctions() {
-    Set<Object> filter = new HashSet<>();
-    filter.add(0);
-    FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction())
-        .getResult();
-    filter.add(1);
-    FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction())
-        .getResult();
-    filter.add(2);
-    filter.add(3);
-    FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction())
-        .getResult();
-    FunctionService.onRegion(testRegion).execute(new PutFunction()).getResult();
-  }
-
-  private void putAll() {
-    Map<Object, Object> map = new HashMap<>();
-    map.put(0, 0);
-    map.put(1, 1);
-    map.put(2, 2);
-    map.put(3, 3);
-    testRegion.putAll(map, "putAllCallback");
-    testRegion.putAll(map);
-    testRegion.putAll(map);
-    testRegion.putAll(map);
-  }
-
-  private void put() {
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
-    for (int i = 0; i < 40; i++) {
-      testRegion.put(i, "create" + i);
-    }
-  }
-
-  private void getFromPartitionedRegions() {
-    for (int i = 0; i <= 3; i++) {
-      CustomerId custid = new CustomerId(i);
-      Customer customer = new Customer("name" + i, "Address" + i);
-      customerRegion.get(custid, customer);
-
-      for (int j = 1; j <= 10; j++) {
-        int oid = i * 10 + j;
-        OrderId orderId = new OrderId(oid, custid);
-        Order order = new Order("Order" + oid);
-        orderRegion.get(orderId, order);
-
-        for (int k = 1; k <= 10; k++) {
-          int sid = oid * 10 + k;
-          ShipmentId shipmentId = new ShipmentId(sid, orderId);
-          Shipment shipment = new Shipment("Shipment" + sid);
-          shipmentRegion.get(shipmentId, shipment);
-        }
-      }
-    }
-
-    testRegion.get(0, "create0");
-    testRegion.get(1, "create1");
-    testRegion.get(2, "create2");
-    testRegion.get(3, "create3");
-
-    testRegion.get(0, "update0");
-    testRegion.get(1, "update1");
-    testRegion.get(2, "update2");
-    testRegion.get(3, "update3");
-
-    testRegion.get(0, "update00");
-    testRegion.get(1, "update11");
-    testRegion.get(2, "update22");
-    testRegion.get(3, "update33");
-  }
-
-  private void putIntoSinglePR() {
-    testRegion.put(0, "create0");
-    testRegion.put(1, "create1");
-    testRegion.put(2, "create2");
-    testRegion.put(3, "create3");
-  }
-
-  private void updateIntoSinglePR() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-
-    clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false);
-
-    testRegion.put(0, "update0");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(1, "update1");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(2, "update2");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(3, "update3");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(0, "update00");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(1, "update11");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(2, "update22");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-
-    testRegion.put(3, "update33");
-    assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse();
-  }
-
-  private void verifyEmptyMetadata() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty();
-  }
-
-  private void verifyEmptyStaticData() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty();
-  }
-
-  private void verifyMetadata() {
-    ClientMetadataService clientMetadataService = cache.getClientMetadataService();
-    // make sure all fetch tasks are completed
-    await().untilAsserted(() -> {
-      assertThat(clientMetadataService.getRefreshTaskCount_TEST_ONLY()).isZero();
-    });
+    throw new IllegalStateException("Cache or region not found");
   }
 
   private static class PutFunction extends FunctionAdapter implements DataSerializable {
@@ -1996,15 +1854,7 @@
 
   public static class CustomerIdPartitionResolver<K, V> implements PartitionResolver<K, V> {
 
-    private String id;
-
-    public CustomerIdPartitionResolver() {
-      // required
-    }
-
-    public CustomerIdPartitionResolver(String id) {
-      this.id = id;
-    }
+    private final String id = getClass().getSimpleName();
 
     @Override
     public String getName() {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
index d1991c6..ad591ac 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
@@ -32,7 +32,9 @@
 import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMId;
 import static org.apache.geode.test.dunit.VM.toArray;
@@ -59,7 +61,6 @@
 
 import org.apache.geode.CancelException;
 import org.apache.geode.ForcedDisconnectException;
-import org.apache.geode.alerting.internal.spi.AlertingIOException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.LocatorLauncher;
@@ -81,6 +82,7 @@
 @SuppressWarnings("serial")
 public class JMXMBeanReconnectDUnitTest implements Serializable {
 
+  private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
   private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class);
   private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class);
 
@@ -155,7 +157,6 @@
     String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + regionName;
     gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();
 
-    addIgnoredException(AlertingIOException.class);
     addIgnoredException(CacheClosedException.class);
     addIgnoredException(CancelException.class);
     addIgnoredException(DistributedSystemDisconnectedException.class);
@@ -201,14 +202,13 @@
 
   @After
   public void tearDown() {
-    for (VM vm : asList(serverVM, locator2VM, locator1VM)) {
-      vm.invoke(() -> {
-        BEFORE.get().countDown();
-        AFTER.get().countDown();
-        SERVER.getAndSet(DUMMY_SERVER).stop();
-        LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
-      });
-    }
+    invokeInEveryVM(() -> {
+      BEFORE.get().countDown();
+      AFTER.get().countDown();
+      SERVER.getAndSet(DUMMY_SERVER).stop();
+      LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
+    });
+    disconnectAllFromDS();
   }
 
   @Test
@@ -241,8 +241,7 @@
       await().untilAsserted(() -> {
         assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
             .as("GemFire mbeans on locator1")
-            .containsAll(expectedLocatorMXBeans(locator1Name))
-            .containsAll(expectedLocatorMXBeans(locator2Name));
+            .containsAll(expectedLocatorMXBeans(locator1Name));
       });
     });
 
@@ -250,8 +249,7 @@
       await().untilAsserted(() -> {
         assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
             .as("GemFire mbeans on locator2")
-            .containsAll(expectedLocatorMXBeans(locator2Name))
-            .containsAll(expectedLocatorMXBeans(locator1Name));
+            .containsAll(expectedLocatorMXBeans(locator2Name));
       });
     });
   }
@@ -350,7 +348,7 @@
             .isTrue();
       });
 
-      system.waitUntilReconnected(getTimeout().getValueInMS(), MILLISECONDS);
+      system.waitUntilReconnected(TIMEOUT_MILLIS, MILLISECONDS);
 
       await().untilAsserted(() -> {
         assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
@@ -384,7 +382,7 @@
         @Override
         public void reconnecting(InternalDistributedSystem oldSystem) {
           try {
-            BEFORE.get().await(getTimeout().getValueInMS(), MILLISECONDS);
+            BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
           } catch (InterruptedException e) {
             errorCollector.addError(e);
           }
@@ -445,7 +443,7 @@
         @Override
         public void reconnecting(InternalDistributedSystem oldSystem) {
           try {
-            BEFORE.get().await(getTimeout().getValueInMS(), MILLISECONDS);
+            BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
           } catch (InterruptedException e) {
             errorCollector.addError(e);
           }
@@ -480,7 +478,7 @@
 
     serverVM.invoke(() -> {
       BEFORE.get().countDown();
-      AFTER.get().await(getTimeout().getValueInMS(), MILLISECONDS);
+      AFTER.get().await(TIMEOUT_MILLIS, MILLISECONDS);
 
       await().untilAsserted(() -> {
         assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
@@ -509,7 +507,6 @@
   private static void startLocator(String name, File workingDirectory, int locatorPort, int jmxPort,
       String locators) {
     LOCATOR.set(new LocatorLauncher.Builder()
-        .setDeletePidFileOnStop(true)
         .setMemberName(name)
         .setPort(locatorPort)
         .setWorkingDirectory(workingDirectory.getAbsolutePath())
@@ -535,7 +532,6 @@
 
   private static void startServer(String name, File workingDirectory, String locators) {
     SERVER.set(new ServerLauncher.Builder()
-        .setDeletePidFileOnStop(true)
         .setDisableDefaultServer(true)
         .setMemberName(name)
         .setWorkingDirectory(workingDirectory.getAbsolutePath())
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java
index c2d5be5..7e10090 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java
@@ -48,4 +48,6 @@
   CachePerfStats getCachePerfStats();
 
   MeterRegistry getMeterRegistry();
+
+  ClientMetadataService getClientMetadataService();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index d16e7fd..7ea6532 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -737,7 +737,7 @@
       // UDP readers are throttled in the FC protocol, which queries
       // the queue to see if it should throttle
       if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE
-          && !DistributionMessage.isPreciousThread()) {
+          && !DistributionMessage.isMembershipMessengerThread()) {
         do {
           boolean interrupted = Thread.interrupted();
           try {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 4fed3c1..0f11b6b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -412,7 +412,8 @@
    */
   protected void schedule(final ClusterDistributionManager dm) {
     boolean inlineProcess = INLINE_PROCESS
-        && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR && !isPreciousThread();
+        && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR
+        && !isMembershipMessengerThread();
 
     boolean forceInline = this.acker != null || getInlineProcess() || Connection.isDominoThread();
 
@@ -476,13 +477,19 @@
   }
 
   /**
-   * returns true if the current thread should not be used for inline processing. i.e., it is a
-   * "precious" resource
+   * returns true if the current thread should not be used for inline processing because it
+   * is responsible for reading geode-membership messages. Blocking such a thread can cause
+   * a server to be kicked out
    */
-  public static boolean isPreciousThread() {
+  public static boolean isMembershipMessengerThread() {
     String thrname = Thread.currentThread().getName();
-    // return thrname.startsWith("Geode UDP");
-    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver");
+
+    return isMembershipMessengerThreadName(thrname);
+  }
+
+  public static boolean isMembershipMessengerThreadName(String thrname) {
+    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver")
+        || thrname.startsWith("Geode UDP");
   }
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
index 66cadc4..9d062ab 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
@@ -71,27 +71,8 @@
   @Override
   protected void process(final ClusterDistributionManager dm) {
     Assert.assertTrue(this.id != null);
-    // The peer goes deaf after sending us this message, so do not
-    // attempt a reply.
-
-    // final ReplyMessage reply = new ReplyMessage();
-    // reply.setProcessorId(processorId);
-    // reply.setRecipient(getSender());
-    // can't send a response in a UDP receiver thread or we might miss
-    // the other side going away due to blocking receipt of views
-    // if (DistributionMessage.isPreciousThread()) {
-    // dm.getWaitingThreadPool().execute(new Runnable() {
-    // public void run() {
-    // dm.putOutgoing(reply);
-    // dm.handleManagerDeparture(ShutdownMessage.this.id, false, "shutdown message received");
-    // }
-    // });
-    // }
-    // else {
-    // dm.putOutgoing(reply);
     dm.shutdownMessageReceived(id,
         "shutdown message received");
-    // }
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
index a7a8c21..2432cd9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
@@ -102,7 +102,7 @@
       throw new InterruptedException();
     // only block threads reading from tcp stream sockets. blocking udp
     // will cause retransmission storms
-    if (!DistributionMessage.isPreciousThread()) {
+    if (!DistributionMessage.isMembershipMessengerThread()) {
       long startTime = DistributionStats.getStatTime();
       do {
         try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
index cfaa24d..f012085 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
@@ -62,17 +62,23 @@
   @Override
   protected void process(ClusterDistributionManager dm) {
     long latestLastAccessTime = 0L;
+    InternalCache cache = dm.getCache();
+    if (cache == null) {
+      return;
+    }
     InternalDistributedRegion region =
-        (InternalDistributedRegion) dm.getCache().getRegion(this.regionName);
-    if (region != null) {
-      RegionEntry entry = region.getRegionEntry(this.key);
-      if (entry != null) {
-        try {
-          latestLastAccessTime = entry.getLastAccessed();
-        } catch (InternalStatisticsDisabledException ignored) {
-          // last access time is not available
-        }
-      }
+        (InternalDistributedRegion) cache.getRegion(this.regionName);
+    if (region == null) {
+      return;
+    }
+    RegionEntry entry = region.getRegionEntry(this.key);
+    if (entry == null) {
+      return;
+    }
+    try {
+      latestLastAccessTime = entry.getLastAccessed();
+    } catch (InternalStatisticsDisabledException ignored) {
+      // last access time is not available
     }
     ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
index 195df9f..c03e990 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
@@ -14,10 +14,16 @@
  */
 package org.apache.geode.internal.cache.util;
 
+import org.apache.geode.cache.execute.Execution;
+
 @SuppressWarnings({"unchecked", "unused"})
 public class UncheckedUtils {
 
   public static <T> T cast(Object object) {
     return (T) object;
   }
+
+  public static <IN, OUT, AGG> Execution<IN, OUT, AGG> cast(Execution execution) {
+    return execution;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index b87316b..714c9cb 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -478,6 +478,9 @@
               return;
             }
             proxyFactory.createAllProxies(member, proxyMonitoringRegion);
+
+            managementCacheListener.markReady();
+            notifListener.markReady();
           } catch (Exception e) {
             if (logger.isDebugEnabled()) {
               logger.debug("Error During GII Proxy creation", e);
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementCacheListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementCacheListener.java
index 4adae1d..3471552 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementCacheListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementCacheListener.java
@@ -19,41 +19,47 @@
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
 import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
- * This listener is attached to the Monitoring Region to receive any addition or deletion of MBeans.
- * It updates the last refreshed time of proxy once it gets the update request from the Managed
- * Node.
+ * This listener is attached to the Monitoring Region to receive any addition or deletion of MBEans
+ *
+ * It updates the last refreshed time of proxy once it gets the update request from the Managed Node
+ *
+ *
  */
-class ManagementCacheListener extends CacheListenerAdapter<String, Object> {
+public class ManagementCacheListener extends CacheListenerAdapter<String, Object> {
+
   private static final Logger logger = LogService.getLogger();
 
-  private final MBeanProxyFactory proxyHelper;
+  private MBeanProxyFactory proxyHelper;
 
-  ManagementCacheListener(MBeanProxyFactory proxyHelper) {
+  private volatile boolean readyForEvents;
+
+  public ManagementCacheListener(MBeanProxyFactory proxyHelper) {
     this.proxyHelper = proxyHelper;
+    this.readyForEvents = false;
   }
 
   @Override
   public void afterCreate(EntryEvent<String, Object> event) {
+    if (!readyForEvents) {
+      return;
+    }
     ObjectName objectName = null;
 
     try {
       objectName = ObjectName.getInstance(event.getKey());
       Object newObject = event.getNewValue();
-      DistributedMember distributedMember = event.getDistributedMember();
-      Region<String, Object> region = event.getRegion();
-      proxyHelper.createProxy(distributedMember, objectName, region,
+      proxyHelper.createProxy(event.getDistributedMember(), objectName, event.getRegion(),
           newObject);
     } catch (Exception e) {
       if (logger.isDebugEnabled()) {
         logger.debug("Proxy Create failed for {} with exception {}", objectName, e.getMessage(), e);
       }
     }
+
   }
 
   @Override
@@ -70,12 +76,16 @@
             e);
       }
     }
+
   }
 
   @Override
   public void afterUpdate(EntryEvent<String, Object> event) {
     ObjectName objectName = null;
     try {
+      if (!readyForEvents) {
+        return;
+      }
       objectName = ObjectName.getInstance(event.getKey());
 
       ProxyInfo proxyInfo = proxyHelper.findProxyInfo(objectName);
@@ -94,6 +104,13 @@
       if (logger.isDebugEnabled()) {
         logger.debug("Proxy Update failed for {} with exception {}", objectName, e.getMessage(), e);
       }
+
     }
+
   }
+
+  void markReady() {
+    readyForEvents = true;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/NotificationCacheListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/NotificationCacheListener.java
index 0136068..f1c3057 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/NotificationCacheListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/NotificationCacheListener.java
@@ -14,35 +14,101 @@
  */
 package org.apache.geode.management.internal;
 
+
 import javax.management.Notification;
 
-import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.RegionEvent;
 
 /**
  * This listener will be attached to each notification region corresponding to a member
+ *
  */
-class NotificationCacheListener extends CacheListenerAdapter<NotificationKey, Notification> {
+public class NotificationCacheListener implements CacheListener<NotificationKey, Notification> {
 
-  private final NotificationHubClient notificationHubClient;
+  /**
+   * For the
+   */
+  private NotificationHubClient notifClient;
 
-  NotificationCacheListener(MBeanProxyFactory mBeanProxyFactory) {
-    this(new NotificationHubClient(mBeanProxyFactory));
-  }
+  private volatile boolean readyForEvents;
 
-  @VisibleForTesting
-  NotificationCacheListener(NotificationHubClient notificationHubClient) {
-    this.notificationHubClient = notificationHubClient;
+  public NotificationCacheListener(MBeanProxyFactory proxyHelper) {
+
+    notifClient = new NotificationHubClient(proxyHelper);
+    this.readyForEvents = false;
+
   }
 
   @Override
   public void afterCreate(EntryEvent<NotificationKey, Notification> event) {
-    notificationHubClient.sendNotification(event);
+    if (!readyForEvents) {
+      return;
+    }
+    notifClient.sendNotification(event);
+
+  }
+
+  @Override
+  public void afterDestroy(EntryEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterInvalidate(EntryEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterRegionClear(RegionEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterRegionCreate(RegionEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterRegionDestroy(RegionEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterRegionInvalidate(RegionEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void afterRegionLive(RegionEvent<NotificationKey, Notification> event) {
+    // TODO Auto-generated method stub
+
   }
 
   @Override
   public void afterUpdate(EntryEvent<NotificationKey, Notification> event) {
-    notificationHubClient.sendNotification(event);
+    if (!readyForEvents) {
+      return;
+    }
+    notifClient.sendNotification(event);
+
   }
+
+  @Override
+  public void close() {
+    // TODO Auto-generated method stub
+
+  }
+
+  public void markReady() {
+    readyForEvents = true;
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
index 91027e1..7bc854a 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
@@ -14,10 +14,14 @@
  */
 package org.apache.geode.distributed.internal;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -35,4 +39,12 @@
 
     verify(mockDistributionMessage, times(1)).setReplySender(mockReplySender);
   }
+
+  @Test
+  public void membershipMessengerThreadsAreRecognized() {
+    List<String> threadNames = Arrays.asList("unicast receiver", "multicast receiver", "Geode UDP");
+    for (String threadName : threadNames) {
+      assertThat(DistributionMessage.isMembershipMessengerThreadName(threadName)).isTrue();
+    }
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
new file mode 100644
index 0000000..1ddd31e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class LatestLastAccessTimeMessageTest {
+
+  @Test
+  public void processMessageShouldLookForNullCache() throws Exception {
+    final DistributionManager distributionManager = mock(DistributionManager.class);
+    final LatestLastAccessTimeReplyProcessor replyProcessor =
+        mock(LatestLastAccessTimeReplyProcessor.class);
+    final InternalDistributedRegion region = mock(InternalDistributedRegion.class);
+    Set<InternalDistributedMember> recipients = Collections.singleton(new InternalDistributedMember(
+        LocalHostUtil.getLocalHost(), 1234));
+    final LatestLastAccessTimeMessage<String> lastAccessTimeMessage =
+        new LatestLastAccessTimeMessage<>(replyProcessor, recipients, region, "foo");
+    lastAccessTimeMessage.process(mock(ClusterDistributionManager.class));
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/ManagementCacheListenerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/ManagementCacheListenerTest.java
deleted file mode 100644
index 1564331..0000000
--- a/geode-core/src/test/java/org/apache/geode/management/internal/ManagementCacheListenerTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import javax.management.ObjectName;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-import org.mockito.quality.Strictness;
-
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.distributed.DistributedMember;
-
-/**
- * Unit tests for {@link ManagementCacheListener} (ie the SUT). These are characterization tests
- * that define behavior for an existing class. Test method names specify the SUT method, the result
- * of invoking that method, and whether or not there are any specific preconditions.
- */
-public class ManagementCacheListenerTest {
-
-  private static final String OBJECT_NAME_KEY =
-      "GemFire:service=DiskStore,name=cluster_config,type=Member,member=locator1";
-
-  private DistributedMember distributedMember;
-  private MBeanProxyFactory mBeanProxyFactory;
-  private EntryEvent<String, Object> monitoringEvent;
-  private Object monitoringEventNewValue;
-  private Object monitoringEventOldValue;
-  private ObjectName objectName;
-  private ProxyInfo proxyInfo;
-  private ProxyInterface proxyInterface;
-  private Region<String, Object> region;
-
-  private ManagementCacheListener managementCacheListener;
-
-  @Rule
-  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
-
-  @Before
-  public void setUp() throws Exception {
-    distributedMember = mock(DistributedMember.class);
-    mBeanProxyFactory = mock(MBeanProxyFactory.class);
-    monitoringEvent = cast(mock(EntryEvent.class));
-    monitoringEventNewValue = new Object();
-    monitoringEventOldValue = new Object();
-    objectName = ObjectName.getInstance(OBJECT_NAME_KEY);
-    proxyInfo = mock(ProxyInfo.class);
-    proxyInterface = mock(ProxyInterface.class);
-    region = cast(mock(Region.class));
-
-    when(monitoringEvent.getKey())
-        .thenReturn(OBJECT_NAME_KEY);
-
-    managementCacheListener = new ManagementCacheListener(mBeanProxyFactory);
-  }
-
-  @Test
-  public void afterCreate_createsProxy() {
-    when(monitoringEvent.getDistributedMember())
-        .thenReturn(distributedMember);
-    when(monitoringEvent.getNewValue())
-        .thenReturn(monitoringEventNewValue);
-    when(monitoringEvent.getRegion())
-        .thenReturn(region);
-
-    managementCacheListener.afterCreate(monitoringEvent);
-
-    verify(mBeanProxyFactory)
-        .createProxy(
-            same(distributedMember),
-            eq(objectName),
-            same(region),
-            same(monitoringEventNewValue));
-  }
-
-  @Test
-  public void afterDestroy_removesProxy() {
-    when(monitoringEvent.getDistributedMember())
-        .thenReturn(distributedMember);
-    when(monitoringEvent.getOldValue())
-        .thenReturn(monitoringEventOldValue);
-
-    managementCacheListener.afterDestroy(monitoringEvent);
-
-    verify(mBeanProxyFactory)
-        .removeProxy(
-            same(distributedMember),
-            eq(objectName),
-            same(monitoringEventOldValue));
-  }
-
-  @Test
-  public void afterUpdate_updatesProxy_ifProxyExists() {
-    when(mBeanProxyFactory.findProxyInfo(eq(objectName)))
-        .thenReturn(proxyInfo);
-    when(monitoringEvent.getNewValue())
-        .thenReturn(monitoringEventNewValue);
-    when(monitoringEvent.getOldValue())
-        .thenReturn(monitoringEventOldValue);
-    when(proxyInfo.getProxyInstance())
-        .thenReturn(proxyInterface);
-
-    managementCacheListener.afterUpdate(monitoringEvent);
-
-    verify(mBeanProxyFactory)
-        .updateProxy(
-            eq(objectName),
-            same(proxyInfo),
-            same(monitoringEventNewValue),
-            same(monitoringEventOldValue));
-  }
-
-  @Test
-  public void afterUpdate_updatesLastRefreshedTime_onProxyInterface_ifProxyExists() {
-    when(mBeanProxyFactory.findProxyInfo(eq(objectName)))
-        .thenReturn(proxyInfo);
-    when(proxyInfo.getProxyInstance())
-        .thenReturn(proxyInterface);
-
-    managementCacheListener.afterUpdate(monitoringEvent);
-
-    verify(proxyInterface)
-        .setLastRefreshedTime(anyLong());
-  }
-
-  @Test
-  public void afterUpdate_doesNothing_ifProxyDoesNotExist() {
-    managementCacheListener.afterUpdate(monitoringEvent);
-
-    verify(mBeanProxyFactory, times(0))
-        .updateProxy(any(), any(), any(), any());
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/NotificationCacheListenerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/NotificationCacheListenerTest.java
deleted file mode 100644
index 151f5fe..0000000
--- a/geode-core/src/test/java/org/apache/geode/management/internal/NotificationCacheListenerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import javax.management.Notification;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-import org.mockito.quality.Strictness;
-
-import org.apache.geode.cache.EntryEvent;
-
-/**
- * Unit tests for {@link NotificationCacheListener} (ie the SUT). These are characterization tests
- * that define behavior for an existing class. Test method names specify the SUT method and the
- * result of invoking that method.
- */
-public class NotificationCacheListenerTest {
-
-  private NotificationHubClient notificationHubClient;
-  private EntryEvent<NotificationKey, Notification> notificationEntryEvent;
-
-  @Rule
-  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
-
-  @Before
-  public void setUp() {
-    notificationHubClient = mock(NotificationHubClient.class);
-    notificationEntryEvent = cast(mock(EntryEvent.class));
-  }
-
-  @Test
-  public void afterCreate_notifiesNotificationHubClient() {
-    NotificationCacheListener notificationCacheListener =
-        new NotificationCacheListener(notificationHubClient);
-
-    notificationCacheListener.afterCreate(notificationEntryEvent);
-
-    verify(notificationHubClient).sendNotification(notificationEntryEvent);
-  }
-
-  @Test
-  public void afterUpdate_notifiesNotificationHubClient() {
-    NotificationCacheListener notificationCacheListener =
-        new NotificationCacheListener(notificationHubClient);
-
-    notificationCacheListener.afterUpdate(notificationEntryEvent);
-
-    verify(notificationHubClient).sendNotification(notificationEntryEvent);
-  }
-}
diff --git a/geode-docs/configuring/running/cluster-management-service.html.md.erb b/geode-docs/configuring/running/cluster-management-service.html.md.erb
index 4a53fdb..f5ada9d 100644
--- a/geode-docs/configuring/running/cluster-management-service.html.md.erb
+++ b/geode-docs/configuring/running/cluster-management-service.html.md.erb
@@ -73,3 +73,19 @@
 
 The cluster management service is available to Java clients via a Java API. To enable the cluster management service Java API, include `geode-management.jar` in your client classpath.
 See the [Cluster Management Service Java Client topic on the Geode Wiki](https://cwiki.apache.org/confluence/display/GEODE/Cluster+Management+Service#ClusterManagementService-JavaClient) for more details.
+
+## <a id='cms-security'>Authentication and Authorization</a>
+
+The cluster management service REST API is secured by the <code class="ph codeph">security-manager</code> of your cluster. If a security-manager is specified, by default, you will
+need to pass in a username/password pair in order to invoke the rest api.
+
+```
+curl --user username:password http://example.com/management/regions
+```
+
+When <code class="ph codeph">security-auth-token-enabled-components</code> is set to "all" or "management", you will
+need to pass in a valid bearer token in your request header in order to invoke the rest api.
+
+```
+curl -H "Authorization: Bearer YWhhbWlsdG9uQGFwaWdlZS5jb206bXlwYXNzdzByZAo" http://example.com/management/regions
+```
\ No newline at end of file
diff --git a/geode-docs/reference/topics/gemfire_properties.html.md.erb b/geode-docs/reference/topics/gemfire_properties.html.md.erb
index fa7531b..f681d3f 100644
--- a/geode-docs/reference/topics/gemfire_properties.html.md.erb
+++ b/geode-docs/reference/topics/gemfire_properties.html.md.erb
@@ -190,18 +190,24 @@
 <td>300</td>
 </tr>
 <tr>
-<td>enable-network-partition-detection</td>
-<td>Boolean instructing the system to detect and handle splits in the cluster, typically caused by a partitioning of the network (split brain) where the cluster is running. You must set this property to the same value across all your cluster members. In addition, this property must be set to <code class="ph codeph">true</code> if you are using persistent regions and configure your regions to use DISTRIBUTED_ACK or GLOBAL scope to avoid potential data conflicts.</td>
-<td>S, L</td>
+<td>enable-cluster-configuration</td>
+<td>A value of &quot;true&quot; causes the creation of cluster configuration on dedicated locators. The cluster configuration service on dedicated locator(s) with this property set to &quot;true&quot; serves the configuration to new members joining the cluster and also saves the configuration changes caused by the <code class="ph codeph">gfsh</code> commands. This property is applicable only to dedicated locators.</td>
+<td>L</td>
 <td>true</td>
 </tr>
 <tr>
-<td>enable-cluster-configuration</td>
-<td>A value of &quot;true&quot; causes the creation of cluster configuration on dedicated locators. The cluster configuration service on dedicated locator(s) with this property set to &quot;true&quot; would serve the configuration to new members joining the cluster and also save the configuration changes caused by the <code class="ph codeph">gfsh</code> commands. This property is only applicable to dedicated locators..</td>
+<td>enable-management-rest-service</td>
+<td>A value of &quot;true&quot; enables the cluster management REST service. This service requires that the cluster configuration service also be enabled. If <code class="ph codeph">enable-cluster-configuration</code> is false and this is true, the management REST service would do nothing but report an error if you attempt to use it. Also, even if this property is set to true, The management REST service will be started only if <code class="ph codeph">http-service-port</code> is not &quot;0&quot;.</td>
 <td>L</td>
 <td>true</td>
 </tr>
 <tr>
+<td>enable-network-partition-detection</td>
+<td>Boolean instructing the system to detect and handle splits in the cluster, typically caused by a partitioning of the network (split brain) where the cluster is running. You must set this property to the same value across all your cluster members. In addition, this property must be set to <code class="ph codeph">true</code> if you are using persistent regions and configure your regions to use DISTRIBUTED_ACK or GLOBAL scope to avoid potential data conflicts.</td>
+<td>S, L</td>
+<td>true</td>
+</tr>
+<tr>
 <td>enable-time-statistics</td>
 <td>Boolean instructing the system to track time-based statistics for the cluster and caching. Disabled by default for performance reasons and not recommended for production environments. You must also configure <code class="ph codeph">statistic-sampling-enabled</code> to true and specify a <code class="ph codeph">statistic-archive-file</code>.</td>
 <td>S, L</td>
@@ -518,6 +524,17 @@
 <td><em>not set</em></td>
 </tr>
 <tr>
+<td>security-auth-token-enabled-components</td>
+<td>A comma delimited list of component names which works in conjunction with the <code class="ph codeph">security-manager</code> property. if security manager is enabled, this property will determine what components will use token based authentication instead of basic (username/password) authentication.
+<p>Valid values are: "all", "management", "pulse"</p>
+<p>"all": shorthand for all the security components that support token authentication.</p>
+<p>"management": the management rest service.</p>
+<p>"pulse": the Pulse web app</p>
+</td>
+<td>L</td>
+<td><em>""</em></td>
+</tr>
+<tr>
 <td>security-client-accessor-pp</td>
 <td><b>Deprecated.</b> Used for authorization. The callback that should be invoked in the post-operation phase, which is when the operation has completed on the server but before the result is sent to the client. The post-operation callback is also invoked for the updates that are sent from server to client through the notification channel.</td>
 <td>S, L</td>
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder.java
index 814475a..ea4f806 100755
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder.java
@@ -50,7 +50,7 @@
   private final AtomicBoolean passed = new AtomicBoolean(true);
   private final AtomicBoolean delete = new AtomicBoolean(true);
   private final AtomicReference<File> copyTo = new AtomicReference<>();
-  private final AtomicReference<When> when = new AtomicReference<>(When.FAILS);
+  private final AtomicReference<When> when = new AtomicReference<>();
   private final AtomicReference<String> methodName = new AtomicReference<>();
 
   /**
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataJUnitTest.java
index b50dcf2..592465e 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataJUnitTest.java
@@ -14,11 +14,7 @@
  */
 package org.apache.geode.distributed.internal.membership.gms;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,6 +31,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.serialization.BufferDataOutputStream;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
 import org.apache.geode.internal.serialization.DSFIDSerializerFactory;
@@ -58,13 +55,13 @@
   @Test
   public void testEqualsNotSameType() {
     GMSMemberData member = new GMSMemberData();
-    assertFalse(member.equals("Not a GMSMemberData"));
+    assertThat(member).isNotEqualTo("Not a GMSMemberData");
   }
 
   @Test
   public void testEqualsIsSame() {
     GMSMemberData member = new GMSMemberData();
-    assertTrue(member.equals(member));
+    assertThat(member).isEqualTo(member);
   }
 
   @Test
@@ -72,7 +69,7 @@
     GMSMemberData member = new GMSMemberData();
     UUID uuid = new UUID(0, 0);
     member.setUUID(uuid);
-    assertEquals(0, member.compareTo(member));
+    assertThat(member.compareTo(member)).isZero();
   }
 
   private GMSMemberData createGMSMember(byte[] inetAddress, int viewId, long msb, long lsb) {
@@ -89,7 +86,7 @@
   public void testCompareToInetAddressIsLongerThan() {
     GMSMemberData member1 = createGMSMember(new byte[] {1, 1, 1, 1, 1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1, 1, 1, 1}, 1, 1, 1);
-    assertEquals(1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isGreaterThan(0);
   }
 
   @Test
@@ -100,7 +97,7 @@
             member1.getVersionOrdinal(),
             member1.getUuidMostSignificantBits(), member1.getUuidLeastSignificantBits(),
             member1.getVmViewId());
-    assertEquals(0, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isZero();
   }
 
   @Test
@@ -109,70 +106,70 @@
     GMSMemberData member2 = new GMSMemberData(member1.getInetAddress(), member1.getMembershipPort(),
         member1.getVersionOrdinal(), member1.getUuidMostSignificantBits(),
         member1.getUuidLeastSignificantBits(), 100);
-    assertEquals(false, member1.equals(member2));
+    assertThat(member1).isNotEqualTo(member2);
   }
 
   @Test
   public void testCompareToInetAddressIsShorterThan() {
     GMSMemberData member1 = createGMSMember(new byte[] {1, 1, 1, 1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1, 1, 1, 1, 1}, 1, 1, 1);
-    assertEquals(-1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isLessThan(0);
   }
 
   @Test
   public void testCompareToInetAddressIsGreater() {
     GMSMemberData member1 = createGMSMember(new byte[] {1, 2, 1, 1, 1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1, 1, 1, 1, 1}, 1, 1, 1);
-    assertEquals(1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isGreaterThan(0);
   }
 
   @Test
   public void testCompareToInetAddressIsLessThan() {
     GMSMemberData member1 = createGMSMember(new byte[] {1, 1, 1, 1, 1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1, 2, 1, 1, 1}, 1, 1, 1);
-    assertEquals(-1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isLessThan(0);
   }
 
   @Test
   public void testCompareToMyViewIdLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 2, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 1, 1, 1);
-    assertEquals(1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isGreaterThan(0);
   }
 
   @Test
   public void testCompareToTheirViewIdLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 2, 1, 1);
-    assertEquals(-1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isLessThan(0);
   }
 
   @Test
   public void testCompareToMyMSBLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 1, 2, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 1, 1, 1);
-    assertEquals(1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isGreaterThan(0);
   }
 
   @Test
   public void testCompareToTheirMSBLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 1, 2, 1);
-    assertEquals(-1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isLessThan(0);
   }
 
   @Test
   public void testCompareToMyLSBLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 1, 1, 2);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 1, 1, 1);
-    assertEquals(1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isGreaterThan(0);
   }
 
   @Test
   public void testCompareToTheirLSBLarger() {
     GMSMemberData member1 = createGMSMember(new byte[] {1}, 1, 1, 1);
     GMSMemberData member2 = createGMSMember(new byte[] {1}, 1, 1, 2);
-    assertEquals(-1, member1.compareTo(member2));
+    assertThat(member1.compareTo(member2)).isLessThan(0);
   }
 
   @Test
@@ -180,7 +177,7 @@
     GMSMemberData member = new GMSMemberData();
     UUID uuid = new UUID(0, 0);
     member.setUUID(uuid);
-    assertNull(member.getUUID());
+    assertThat(member.getUUID()).isNull();
   }
 
   @Test
@@ -188,7 +185,7 @@
     GMSMemberData member = new GMSMemberData();
     UUID uuid = new UUID(1, 1);
     member.setUUID(uuid);
-    assertNotNull(member.getUUID());
+    assertThat(member.getUUID()).isNotNull();
   }
 
   /**
@@ -204,6 +201,7 @@
   public void testGMSMemberBackwardCompatibility() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     GMSMemberData member = new GMSMemberData();
+    member.setInetAddr(LocalHostUtil.getLocalHost());
     DataOutput dataOutput = new DataOutputStream(baos);
     SerializationContext serializationContext = dsfidSerializer
         .createSerializationContext(dataOutput);
@@ -216,7 +214,10 @@
         .createDeserializationContext(dataInput);
     GMSMemberData newMember = new GMSMemberData();
     newMember.readEssentialData(dataInput, deserializationContext);
-    assertEquals(member.getVmKind(), newMember.getVmKind());
+    assertThat(newMember.getVmKind()).isEqualTo(member.getVmKind());
+    assertThat(newMember.getInetAddress()).isNotNull();
+    assertThat(newMember.getInetAddress().getHostAddress()).isEqualTo(newMember.getHostName());
+
 
     // vmKind should not be transmitted to a member with version GFE_90 or earlier
     dataOutput = new BufferDataOutputStream(Version.GFE_90);
@@ -227,7 +228,7 @@
     dataInput = new VersionedDataInputStream(stream, Version.GFE_90);
     newMember = new GMSMemberData();
     newMember.readEssentialData(dataInput, deserializationContext);
-    assertEquals(0, newMember.getVmKind());
+    assertThat(newMember.getVmKind()).isZero();
   }
 
 
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 0e19f70..49b8b3a 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -28,9 +28,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,11 +58,7 @@
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
-import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
@@ -335,27 +328,4 @@
     assertThat(spy.getStartupEvents()).isEmpty();
   }
 
-  public static class TestMessage extends AbstractGMSMessage {
-
-    @Override
-    public int getDSFID() {
-      return HIGH_PRIORITY_ACKED_MESSAGE;
-    }
-
-    @Override
-    public void toData(DataOutput out, SerializationContext context) throws IOException {
-
-    }
-
-    @Override
-    public void fromData(DataInput in, DeserializationContext context)
-        throws IOException, ClassNotFoundException {
-
-    }
-
-    @Override
-    public Version[] getSerializationVersions() {
-      return null;
-    }
-  }
 }
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
index 0e823bd..fcc15b2 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.distributed.internal.membership.gms;
 
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
@@ -69,6 +70,7 @@
   @Test
   public void oneMembershipCanStartWithALocator()
       throws IOException, MemberStartupException {
+
     final MembershipLocator<MemberIdentifier> locator = createLocator(0);
     locator.start();
 
@@ -77,13 +79,18 @@
     start(membership);
 
     assertThat(membership.getView().getMembers()).hasSize(1);
+
+    stop(membership);
+    stop(locator);
   }
 
   @Test
   public void twoMembershipsCanStartWithOneLocator()
       throws IOException, MemberStartupException {
+
     final MembershipLocator<MemberIdentifier> locator = createLocator(0);
     locator.start();
+
     final int locatorPort = locator.getPort();
 
     final Membership<MemberIdentifier> membership1 = createMembership(locator, locatorPort);
@@ -92,8 +99,14 @@
     final Membership<MemberIdentifier> membership2 = createMembership(null, locatorPort);
     start(membership2);
 
-    assertThat(membership1.getView().getMembers()).hasSize(2);
-    assertThat(membership2.getView().getMembers()).hasSize(2);
+    await().untilAsserted(
+        () -> assertThat(membership1.getView().getMembers()).hasSize(2));
+
+    await().untilAsserted(
+        () -> assertThat(membership2.getView().getMembers()).hasSize(2));
+
+    stop(membership1, membership2);
+    stop(locator);
   }
 
   @Test
@@ -102,6 +115,7 @@
 
     final MembershipLocator<MemberIdentifier> locator1 = createLocator(0);
     locator1.start();
+
     final int locatorPort1 = locator1.getPort();
 
     Membership<MemberIdentifier> membership1 = createMembership(locator1, locatorPort1);
@@ -109,14 +123,20 @@
 
     final MembershipLocator<MemberIdentifier> locator2 = createLocator(0, locatorPort1);
     locator2.start();
+
     final int locatorPort2 = locator2.getPort();
 
     Membership<MemberIdentifier> membership2 =
         createMembership(locator2, locatorPort1, locatorPort2);
     start(membership2);
 
-    assertThat(membership1.getView().getMembers()).hasSize(2);
-    assertThat(membership2.getView().getMembers()).hasSize(2);
+    await().untilAsserted(
+        () -> assertThat(membership1.getView().getMembers()).hasSize(2));
+    await().untilAsserted(
+        () -> assertThat(membership2.getView().getMembers()).hasSize(2));
+
+    stop(membership2, membership1);
+    stop(locator2, locator1);
   }
 
   @Test
@@ -125,6 +145,7 @@
 
     final MembershipLocator<MemberIdentifier> locator1 = createLocator(0);
     locator1.start();
+
     final int locatorPort1 = locator1.getPort();
 
     final Membership<MemberIdentifier> membership1 = createMembership(locator1, locatorPort1);
@@ -132,17 +153,23 @@
 
     final MembershipLocator<MemberIdentifier> locator2 = createLocator(0, locatorPort1);
     locator2.start();
+
     int locatorPort2 = locator2.getPort();
 
     // Force the next membership to use locator2 by stopping locator1
-    locator1.stop();
+    stop(locator1);
 
     Membership<MemberIdentifier> membership2 =
         createMembership(locator2, locatorPort1, locatorPort2);
     start(membership2);
 
-    assertThat(membership1.getView().getMembers()).hasSize(2);
-    assertThat(membership2.getView().getMembers()).hasSize(2);
+    await().untilAsserted(
+        () -> assertThat(membership1.getView().getMembers()).hasSize(2));
+    await().untilAsserted(
+        () -> assertThat(membership2.getView().getMembers()).hasSize(2));
+
+    stop(membership2, membership1);
+    stop(locator2, locator1);
   }
 
   private void start(final Membership<MemberIdentifier> membership)
@@ -218,4 +245,11 @@
         .create();
   }
 
+  private void stop(final Membership<MemberIdentifier>... memberships) {
+    Arrays.stream(memberships).forEach(membership -> membership.disconnect(false));
+  }
+
+  private void stop(final MembershipLocator<MemberIdentifier>... locators) {
+    Arrays.stream(locators).forEach(locator -> locator.stop());
+  }
 }
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
new file mode 100644
index 0000000..8da2dac
--- /dev/null
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
+
+public class TestMessage extends AbstractGMSMessage {
+
+  @Override
+  public int getDSFID() {
+    return HIGH_PRIORITY_ACKED_MESSAGE;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 0e0748d..fe52eb8 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -25,6 +25,7 @@
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
@@ -74,6 +76,7 @@
 import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
+import org.apache.geode.distributed.internal.membership.gms.TestMessage;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
@@ -823,6 +826,24 @@
     assertTrue(pinger.isPongMessage(m.getBuffer()));
   }
 
+  /**
+   * messages for the Manager that were queued by a quorum checker shouldn't be delivered to
+   * a Manager
+   */
+  @Test
+  public void testIgnoreManagerMessagesFromQuorumChecker() throws Exception {
+    initMocks(false);
+    MemberIdentifier memberIdentifier = createAddress(8888);
+    JGAddress jgAddress = new JGAddress(memberIdentifier);
+
+    ArgumentCaptor<Message> valueCapture = ArgumentCaptor.forClass(Message.class);
+    doNothing().when(manager).processMessage(valueCapture.capture());
+    org.jgroups.Message jgroupsMessage = messenger.createJGMessage(new TestMessage(), jgAddress,
+        memberIdentifier, Version.CURRENT_ORDINAL);
+    messenger.jgroupsReceiver.receive(jgroupsMessage, true);
+    assertThat(valueCapture.getAllValues()).isEmpty();
+  }
+
   @Test
   public void testJGroupsIOExceptionHandler() throws Exception {
     initMocks(false);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java
index b951191..ad00642 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java
@@ -589,7 +589,9 @@
 
     this.inetAddr = StaticSerialization.readInetAddress(in);
     if (this.inetAddr != null) {
-      this.hostName = inetAddr.getHostName();
+      // use address as hostname at this level. getHostName() will do a reverse-dns lookup,
+      // which is very expensive
+      this.hostName = inetAddr.getHostAddress();
     }
     this.udpPort = in.readInt();
     this.vmViewId = in.readInt();
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ca53906..da67d36 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -82,6 +82,7 @@
 import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
 import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -178,7 +179,7 @@
    * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
    * for deserializating and dispatching those messages to the appropriate handler
    */
-  private JGroupsReceiver jgroupsReceiver;
+  protected JGroupsReceiver jgroupsReceiver;
 
   public static void setChannelReceiver(JChannel channel, Receiver r) {
     try {
@@ -1266,7 +1267,7 @@
       receive(jgmsg, false);
     }
 
-    private void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
+    protected void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
       long startTime = services.getStatistics().startUDPDispatchRequest();
       try {
         if (services.getManager().shutdownInProgress()) {
@@ -1320,9 +1321,12 @@
           }
           filterIncomingMessage(msg);
           MessageHandler<Message<ID>> handler = getMessageHandler(msg);
-          if (fromQuorumChecker && handler instanceof HealthMonitor) {
+          if (fromQuorumChecker
+              && (handler instanceof HealthMonitor || handler instanceof Manager)) {
             // ignore suspect / heartbeat messages that happened during
-            // auto-reconnect because they very likely have old member IDs in them
+            // auto-reconnect because they very likely have old member IDs in them.
+            // Also ignore non-membership messages because we weren't a member when we received
+            // them.
           } else {
             handler.processMessage(msg);
           }
diff --git a/geode-web-api/src/integrationTest/java/org/apache/geode/rest/internal/web/controllers/RestAccessControllerTest.java b/geode-web-api/src/integrationTest/java/org/apache/geode/rest/internal/web/controllers/RestAccessControllerTest.java
index 383d9cc..73f183d 100644
--- a/geode-web-api/src/integrationTest/java/org/apache/geode/rest/internal/web/controllers/RestAccessControllerTest.java
+++ b/geode-web-api/src/integrationTest/java/org/apache/geode/rest/internal/web/controllers/RestAccessControllerTest.java
@@ -99,6 +99,8 @@
   private static final String ORDER_CAS_NEW_JSON = "order-cas-new.json";
   private static final String ORDER_CAS_WRONG_OLD_JSON = "order-cas-wrong-old.json";
 
+  private static final String SLASH = "/";
+
   private static Map<String, String> jsonResources = new HashMap<>();
 
   private static RequestPostProcessor POST_PROCESSOR = new StandardRequestPostProcessor();
@@ -106,7 +108,7 @@
   private MockMvc mockMvc;
 
   private static Region<?, ?> orderRegion;
-  private static Region<?, ?> customerRegion;
+  private static Region<String, PdxInstance> customerRegion;
 
   @ClassRule
   public static ServerStarterRule rule = new ServerStarterRule()
@@ -180,6 +182,25 @@
 
   @Test
   @WithMockUser
+  public void postEntryWithSlashKey() throws Exception {
+    String key = "1" + SLASH + "2";
+    mockMvc.perform(post("/v1/orders?key=" + key)
+        .content(jsonResources.get(ORDER1_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isCreated())
+        .andExpect(header().string("Location", BASE_URL + "/orders/" + key));
+
+    mockMvc.perform(post("/v1/orders?key=" + key)
+        .content(jsonResources.get(ORDER1_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isConflict());
+
+    Order order = (Order) ((PdxInstance) orderRegion.get(key)).getObject();
+    assertThat(order).as("order should not be null").isNotNull();
+  }
+
+  @Test
+  @WithMockUser
   public void postEntryWithJsonArrayOfOrders() throws Exception {
     mockMvc.perform(post("/v1/orders?key=1")
         .content(jsonResources.get(ORDER1_ARRAY_JSON))
@@ -200,6 +221,27 @@
 
   @Test
   @WithMockUser
+  public void postEntryWithSlashKeysAndJsonArrayOfOrders() throws Exception {
+    String key = "1" + SLASH + "2";
+    mockMvc.perform(post("/v1/orders?key=" + key)
+        .content(jsonResources.get(ORDER1_ARRAY_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isCreated())
+        .andExpect(header().string("Location", BASE_URL + "/orders/" + key));
+
+    mockMvc.perform(post("/v1/orders?key=" + key)
+        .content(jsonResources.get(ORDER1_ARRAY_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isConflict());
+
+    @SuppressWarnings("unchecked")
+    List<PdxInstance> entries = (List<PdxInstance>) orderRegion.get(key);
+    Order order = (Order) entries.get(0).getObject();
+    assertThat(order).as("order should not be null").isNotNull();
+  }
+
+  @Test
+  @WithMockUser
   public void failPostEntryWithInvalidJson() throws Exception {
     mockMvc.perform(post("/v1/orders?key=1")
         .content(jsonResources.get(MALFORMED_JSON))
@@ -271,6 +313,23 @@
 
   @Test
   @WithMockUser
+  public void putEntryWithSlashKey() throws Exception {
+    String key = "1" + SLASH + "2";
+    mockMvc.perform(put("/v1/orders/" + key)
+        .content(jsonResources.get(ORDER2_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isOk())
+        .andExpect(header().string("Location", BASE_URL + "/orders/" + key));
+
+    mockMvc.perform(put("/v1/orders/" + key)
+        .content(jsonResources.get(ORDER2_JSON))
+        .with(POST_PROCESSOR))
+        .andExpect(status().isOk())
+        .andExpect(header().string("Location", BASE_URL + "/orders/" + key));
+  }
+
+  @Test
+  @WithMockUser
   public void failPutEntryWithInvalidJson() throws Exception {
     mockMvc.perform(put("/v1/orders/1")
         .content(jsonResources.get(MALFORMED_JSON))
@@ -306,13 +365,47 @@
   @Test
   @WithMockUser
   public void putAll() throws Exception {
+    StringBuilder keysBuilder = new StringBuilder();
+    for (int i = 1; i < 60; i++) {
+      keysBuilder.append(i).append(',');
+    }
+    keysBuilder.append(60);
+    String keys = keysBuilder.toString();
     mockMvc.perform(
-        put("/v1/customers/1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60")
+        put("/v1/customers/" + keys)
             .content(jsonResources.get(CUSTOMER_LIST_JSON))
             .with(POST_PROCESSOR))
         .andExpect(status().isOk())
-        .andExpect(header().string("Location", BASE_URL
-            + "/customers/1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60"));
+        .andExpect(header().string("Location", BASE_URL + "/customers/" + keys));
+    assertThat(customerRegion.size()).isEqualTo(60);
+    for (int i = 1; i <= 60; i++) {
+      PdxInstance customer = customerRegion.get(String.valueOf(i));
+      assertThat(customer.getField("customerId").toString())
+          .isEqualTo(Integer.valueOf(100 + i).toString());
+    }
+  }
+
+  @Test
+  @WithMockUser
+  public void putAllWithSlashes() throws Exception {
+    StringBuilder keysBuilder = new StringBuilder();
+    for (int i = 1; i < 60; i++) {
+      keysBuilder.append(i).append(SLASH).append(',');
+    }
+    keysBuilder.append(60).append(SLASH);
+    String keys = keysBuilder.toString();
+    mockMvc.perform(
+        put("/v1/customers/" + keys)
+            .content(jsonResources.get(CUSTOMER_LIST_JSON))
+            .with(POST_PROCESSOR))
+        .andExpect(status().isOk())
+        .andExpect(header().string("Location", BASE_URL + "/customers/" + keys));
+    assertThat(customerRegion.size()).isEqualTo(60);
+    for (int i = 1; i <= 60; i++) {
+      PdxInstance customer = customerRegion.get(String.valueOf(i) + SLASH);
+      assertThat(customer.getField("customerId").toString())
+          .isEqualTo(Integer.valueOf(100 + i).toString());
+    }
   }
 
   @Test
@@ -590,7 +683,7 @@
   @WithMockUser
   public void getSpecificKeys() throws Exception {
     putAll();
-    mockMvc.perform(get("/v1/customers/1,2,3,4,5")
+    mockMvc.perform(get("/v1/customers/1,2,3,4,5?ignoreMissingKey=false")
         .with(POST_PROCESSOR))
         .andExpect(status().isOk())
         .andExpect(
@@ -599,6 +692,18 @@
 
   @Test
   @WithMockUser
+  public void getSpecificKeysWithSlashes() throws Exception {
+    putAllWithSlashes();
+    mockMvc.perform(get("/v1/customers/1" + SLASH + ",2" + SLASH + ",3" + SLASH
+        + ",4" + SLASH + ",5" + SLASH)
+            .with(POST_PROCESSOR))
+        .andExpect(status().isOk())
+        .andExpect(
+            jsonPath("$.customers[*].customerId", containsInAnyOrder(101, 102, 103, 104, 105)));
+  }
+
+  @Test
+  @WithMockUser
   public void getSpecificKeysFromUnknownRegion() throws Exception {
     mockMvc.perform(get("/v1/unknown/1,2,3,4,5")
         .with(POST_PROCESSOR))
@@ -635,6 +740,16 @@
 
   @Test
   @WithMockUser
+  public void deleteMultipleKeysWithSlashes() throws Exception {
+    putAllWithSlashes();
+    mockMvc.perform(delete("/v1/customers/1" + SLASH + ",2" + SLASH + ",3"
+        + SLASH + ",4" + SLASH + ",5" + SLASH)
+            .with(POST_PROCESSOR))
+        .andExpect(status().isOk());
+  }
+
+  @Test
+  @WithMockUser
   public void deleteAllKeys() throws Exception {
     putAll();
     mockMvc.perform(delete("/v1/customers")
diff --git a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/AbstractBaseController.java b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/AbstractBaseController.java
index 7bc1bb0..3377063 100644
--- a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/AbstractBaseController.java
+++ b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/AbstractBaseController.java
@@ -28,6 +28,8 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.servlet.http.HttpServletRequest;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
@@ -972,4 +974,16 @@
     targetedMembers.add(cache.getDistributedSystem().getDistributedMember());
     return targetedMembers;
   }
+
+  protected String[] parseKeys(HttpServletRequest request, String region) {
+    String uri = request.getRequestURI();
+    int regionIndex = uri.indexOf("/" + region + "/");
+    if (regionIndex == -1) {
+      throw new IllegalStateException(
+          String.format("Could not find the region (%1$s) in the URI (%2$s)", region, uri));
+    }
+    int keysIndex = regionIndex + region.length() + 2;
+    String keysString = uri.substring(keysIndex);
+    return keysString.split(",");
+  }
 }
diff --git a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/CommonCrudController.java b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/CommonCrudController.java
index 19fa789..4d85865 100644
--- a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/CommonCrudController.java
+++ b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/CommonCrudController.java
@@ -19,6 +19,8 @@
 import java.util.List;
 import java.util.Set;
 
+import javax.servlet.http.HttpServletRequest;
+
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
@@ -113,21 +115,21 @@
    * Delete data for single key or specific keys in region
    *
    * @param region gemfire region
-   * @param keys for which data is requested
    * @return JSON document containing result
    */
-  @RequestMapping(method = RequestMethod.DELETE, value = "/{region}/{keys}",
+  @RequestMapping(method = RequestMethod.DELETE, value = "/{region}/**",
       produces = {APPLICATION_JSON_UTF8_VALUE})
   @ApiOperation(value = "delete data for key(s)",
-      notes = "Delete data for single key or specific keys in region")
+      notes = "Delete data for one or more keys in a region. The keys, ** in the endpoint, are a comma separated list.")
   @ApiResponses({@ApiResponse(code = 200, message = "OK"),
       @ApiResponse(code = 401, message = "Invalid Username or Password."),
       @ApiResponse(code = 403, message = "Insufficient privileges for operation."),
       @ApiResponse(code = 404, message = "Region or key(s) does not exist"),
       @ApiResponse(code = 500, message = "GemFire throws an error or exception")})
-  @PreAuthorize("@securityService.authorize('WRITE', #region, #keys)")
   public ResponseEntity<?> delete(@PathVariable("region") String region,
-      @PathVariable("keys") final String[] keys) {
+      HttpServletRequest request) {
+    String[] keys = parseKeys(request, region);
+    securityService.authorize("WRITE", region, keys);
     logger.debug("Delete data for key {} on region {}", ArrayUtils.toString((Object[]) keys),
         region);
 
diff --git a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/PdxBasedCrudController.java b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/PdxBasedCrudController.java
index 3385e36..ce2ddd4 100644
--- a/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/PdxBasedCrudController.java
+++ b/geode-web-api/src/main/java/org/apache/geode/rest/internal/web/controllers/PdxBasedCrudController.java
@@ -18,6 +18,8 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.servlet.http.HttpServletRequest;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -196,23 +198,23 @@
    * Reading data for set of keys
    *
    * @param region gemfire region name
-   * @param keys string containing comma separated keys
    * @return JSON document
    */
-  @RequestMapping(method = RequestMethod.GET, value = "/{region}/{keys}",
+  @RequestMapping(method = RequestMethod.GET, value = "/{region}/**",
       produces = APPLICATION_JSON_UTF8_VALUE)
   @ApiOperation(value = "read data for specific keys",
-      notes = "Read data for specific set of keys in region.")
+      notes = "Read data for specific set of keys in a region. The keys, ** in the endpoint, are a comma separated list.")
   @ApiResponses({@ApiResponse(code = 200, message = "OK."),
       @ApiResponse(code = 400, message = "Bad Request."),
       @ApiResponse(code = 401, message = "Invalid Username or Password."),
       @ApiResponse(code = 403, message = "Insufficient privileges for operation."),
       @ApiResponse(code = 404, message = "Region does not exist."),
       @ApiResponse(code = 500, message = "GemFire throws an error or exception.")})
-  @PreAuthorize("@securityService.authorize('READ', #region, #keys)")
   public ResponseEntity<?> read(@PathVariable("region") String region,
-      @PathVariable("keys") final String[] keys,
-      @RequestParam(value = "ignoreMissingKey", required = false) final String ignoreMissingKey) {
+      @RequestParam(value = "ignoreMissingKey", required = false) final String ignoreMissingKey,
+      HttpServletRequest request) {
+    String[] keys = parseKeys(request, region);
+    securityService.authorize("READ", region, keys);
     logger.debug("Reading data for keys ({}) in Region ({})", ArrayUtils.toString(keys), region);
 
     final HttpHeaders headers = new HttpHeaders();
@@ -272,18 +274,18 @@
    * Update data for a key or set of keys
    *
    * @param region gemfire data region
-   * @param keys keys for which update operation is requested
    * @param opValue type of update (put, replace, cas etc)
    * @param json new data for the key(s)
    * @return JSON document
    */
-  @RequestMapping(method = RequestMethod.PUT, value = "/{region}/{keys}",
+  @RequestMapping(method = RequestMethod.PUT, value = "/{region}/**",
       consumes = {APPLICATION_JSON_UTF8_VALUE}, produces = {
           APPLICATION_JSON_UTF8_VALUE})
   @ApiOperation(value = "update data for key",
-      notes = "Update or insert (put) data for key in region."
-          + "op=REPLACE, update (replace) data with key if and only if the key exists in region"
-          + "op=CAS update (compare-and-set) value having key with a new value if and only if the \"@old\" value sent matches the current value for the key in region")
+      notes = "Update or insert (put) data for keys in a region."
+          + " The keys, ** in the endpoint, are a comma separated list."
+          + " If op=REPLACE, update (replace) data with key if and only if the key exists in the region."
+          + " If op=CAS update (compare-and-set) value having key with a new value if and only if the \"@old\" value sent matches the current value for the key in the region.")
   @ApiResponses({@ApiResponse(code = 200, message = "OK."),
       @ApiResponse(code = 400, message = "Bad Request."),
       @ApiResponse(code = 401, message = "Invalid Username or Password."),
@@ -293,11 +295,11 @@
       @ApiResponse(code = 409,
           message = "For CAS, @old value does not match to the current value in region"),
       @ApiResponse(code = 500, message = "GemFire throws an error or exception.")})
-  @PreAuthorize("@securityService.authorize('WRITE', #region, #keys)")
   public ResponseEntity<?> update(@PathVariable("region") String region,
-      @PathVariable("keys") final String[] keys,
       @RequestParam(value = "op", defaultValue = "PUT") final String opValue,
-      @RequestBody final String json) {
+      @RequestBody final String json, HttpServletRequest request) {
+    String[] keys = parseKeys(request, region);
+    securityService.authorize("WRITE", region, keys);
     logger.debug("updating key(s) for region ({}) ", region);
 
     region = decode(region);