GEODE-8696: Fix synchronization in FederatingManager (#5728)

Prevent hang while protecting against removal of member artifacts
during startup of manager.
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
index 4736161..7a1d52a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
@@ -387,7 +387,7 @@
 
       for (DistributedMember member : otherMembers) {
         Set<ObjectName> proxyNames =
-            service.getFederatingManager().getProxyFactory().findAllProxies(member);
+            service.getFederatingManager().proxyFactory().findAllProxies(member);
         assertThat(proxyNames).isEmpty();
 
         ObjectName proxyMBeanName = service.getMemberMBeanName(member);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
index 555c981..b25fc7a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
@@ -114,15 +114,18 @@
   public void getMemberCount() {
     // 1 manager, 3 members, 1 dunit locator
     managerVM.invoke(() -> {
-      await()
-          .untilAsserted(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
+      await().untilAsserted(() -> {
+        assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+      });
     });
   }
 
   @Test
   public void showJVMMetrics() {
     managerVM.invoke(() -> {
-      await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+      await().untilAsserted(() -> {
+        assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+      });
 
       for (DistributedMember member : getOtherNormalMembers()) {
         assertThat(distributedSystemMXBean.showJVMMetrics(member.getName())).isNotNull();
@@ -133,7 +136,9 @@
   @Test
   public void showOSMetrics() {
     managerVM.invoke(() -> {
-      await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+      await().untilAsserted(() -> {
+        assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+      });
 
       Set<InternalDistributedMember> otherMembers = getOtherNormalMembers();
       for (DistributedMember member : otherMembers) {
@@ -147,15 +152,18 @@
     managerVM.invoke(() -> {
       distributedSystemMXBean.shutDownAllMembers();
 
-      await().untilAsserted(() -> assertThat(getOtherNormalMembers()).hasSize(0));
+      await().untilAsserted(() -> {
+        assertThat(getOtherNormalMembers()).hasSize(0);
+      });
     });
   }
 
   @Test
   public void listMemberObjectNames() {
     managerVM.invoke(() -> {
-      await().untilAsserted(
-          () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4));
+      await().untilAsserted(() -> {
+        assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4);
+      });
     });
   }
 
@@ -164,7 +172,9 @@
     managerVM.invoke(() -> {
       String memberName = distributedMember.getName();
 
-      await().until(() -> distributedSystemMXBean.fetchMemberObjectName(memberName) != null);
+      await().untilAsserted(() -> {
+        assertThat(distributedSystemMXBean.fetchMemberObjectName(memberName)).isNotNull();
+      });
 
       ObjectName memberMXBeanName = distributedSystemMXBean.fetchMemberObjectName(memberName);
       assertThat(memberMXBeanName).isEqualTo(getMemberMBeanName(memberName));
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java
new file mode 100644
index 0000000..9d5678e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static java.util.Arrays.asList;
+import static javax.management.ObjectName.getInstance;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
+import static org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper.crashDistributedSystem;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.QueryExp;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.alerting.internal.spi.AlertingIOException;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedReference;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@Category(JMXTest.class)
+@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"})
+public class JmxLocatorReconnectDistributedTest implements Serializable {
+
+  private static final String LOCATOR_NAME = "locator";
+  private static final String SERVER_NAME = "server";
+  private static final String REGION_NAME = "region";
+  private static final QueryExp QUERY_ALL = null;
+
+  private static final ObjectName GEMFIRE_MXBEANS =
+      execute(() -> getInstance("GemFire:*"));
+  private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS =
+      execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME));
+  private static final Set<ObjectName> EXPECTED_LOCATOR_MXBEANS =
+      execute(() -> expectedLocatorMXBeans(LOCATOR_NAME));
+  private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS =
+      execute(() -> expectedDistributedMXBeans(REGION_NAME));
+
+  private VM locatorVM;
+  private VM serverVM;
+
+  private String locators;
+  private int locatorPort;
+  private int locatorJmxPort;
+  private Set<ObjectName> mxbeansOnServer;
+  private Set<ObjectName> mxbeansOnLocator;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(2);
+  @Rule
+  public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+  @Rule
+  public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>();
+  @Rule
+  public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>();
+  @Rule
+  public DistributedRestoreSystemProperties restoreProps = new DistributedRestoreSystemProperties();
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Before
+  public void setUp() throws Exception {
+    locatorVM = getVM(0);
+    serverVM = getVM(1);
+
+    File locatorDir = temporaryFolder.newFolder(LOCATOR_NAME);
+    File serverDir = temporaryFolder.newFolder(SERVER_NAME);
+
+    for (VM vm : asList(locatorVM, serverVM)) {
+      vm.invoke(() -> System.setProperty(GEMFIRE_PREFIX + "standard-output-always-on", "true"));
+    }
+
+    int[] port = getRandomAvailableTCPPorts(2);
+    locatorPort = port[0];
+    locatorJmxPort = port[1];
+    locators = "localhost[" + locatorPort + "]";
+
+    locatorVM.invoke(() -> {
+      locatorLauncher.set(startLocator(locatorDir, locatorPort, locatorJmxPort, locators));
+    });
+
+    serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators)));
+
+    gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager);
+
+    String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME;
+    gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();
+
+    addIgnoredException(AlertingIOException.class);
+    addIgnoredException(CacheClosedException.class);
+    addIgnoredException(CancelException.class);
+    addIgnoredException(DistributedSystemDisconnectedException.class);
+    addIgnoredException(ForcedDisconnectException.class);
+    addIgnoredException(MemberDisconnectedException.class);
+    addIgnoredException("Possible loss of quorum");
+
+    mxbeansOnServer = serverVM.invoke(() -> {
+      await().untilAsserted(() -> {
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
+            .containsAll(EXPECTED_SERVER_MXBEANS);
+      });
+      return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+    });
+
+    mxbeansOnLocator = locatorVM.invoke(() -> {
+      await().untilAsserted(() -> {
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator")
+            .containsAll(EXPECTED_SERVER_MXBEANS)
+            .containsAll(EXPECTED_LOCATOR_MXBEANS)
+            .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
+      });
+      return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+    });
+  }
+
+  @Test
+  public void serverMXBeanOnServerIsUnaffectedByLocatorCrash() {
+    locatorVM.invoke(() -> {
+      crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
+
+      await().untilAsserted(() -> {
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator")
+            .isEmpty();
+      });
+    });
+
+    serverVM.invoke(() -> {
+      assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+          .as("GemFire mxbeans on server")
+          .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
+    });
+
+    locatorVM.invoke(() -> {
+      InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
+
+      await().untilAsserted(() -> {
+        boolean isReconnected = locator.isReconnected();
+        boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning();
+        Set<ObjectName> mbeanNames =
+            getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+
+        assertThat(isReconnected)
+            .as("Locator is reconnected on locator")
+            .isTrue();
+        assertThat(isSharedConfigurationRunning)
+            .as("Locator shared configuration is running on locator")
+            .isTrue();
+        assertThat(mbeanNames)
+            .as("GemFire mxbeans on locator")
+            .isEqualTo(mxbeansOnLocator);
+      });
+    });
+
+    serverVM.invoke(() -> {
+      assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+          .as("GemFire mxbeans on server")
+          .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
+    });
+  }
+
+  private static LocatorLauncher startLocator(File workingDirectory, int locatorPort, int jmxPort,
+      String locators) {
+    LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+        .setMemberName(LOCATOR_NAME)
+        .setPort(locatorPort)
+        .setWorkingDirectory(workingDirectory.getAbsolutePath())
+        .set(HTTP_SERVICE_PORT, "0")
+        .set(JMX_MANAGER, "true")
+        .set(JMX_MANAGER_PORT, String.valueOf(jmxPort))
+        .set(JMX_MANAGER_START, "true")
+        .set(LOCATORS, locators)
+        .set(LOG_FILE, new File(workingDirectory, LOCATOR_NAME + ".log").getAbsolutePath())
+        .set(MAX_WAIT_TIME_RECONNECT, "1000")
+        .set(MEMBER_TIMEOUT, "2000")
+        .build();
+
+    locatorLauncher.start();
+
+    InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
+
+    await().untilAsserted(() -> {
+      assertThat(locator.isSharedConfigurationRunning())
+          .as("Locator shared configuration is running on locator" + getVMId())
+          .isTrue();
+    });
+
+    return locatorLauncher;
+  }
+
+  private static ServerLauncher startServer(File workingDirectory, String locators) {
+    ServerLauncher serverLauncher = new ServerLauncher.Builder()
+        .setDisableDefaultServer(true)
+        .setMemberName(SERVER_NAME)
+        .setWorkingDirectory(workingDirectory.getAbsolutePath())
+        .set(HTTP_SERVICE_PORT, "0")
+        .set(LOCATORS, locators)
+        .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath())
+        .set(MAX_WAIT_TIME_RECONNECT, "1000")
+        .set(MEMBER_TIMEOUT, "2000")
+        .build();
+
+    serverLauncher.start();
+
+    return serverLauncher;
+  }
+
+  private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName)
+      throws MalformedObjectNameException {
+    return new HashSet<>(asList(
+        getInstance("GemFire:type=Member,member=" + memberName),
+        getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName +
+            ",type=Member,member=" + memberName)));
+  }
+
+  private static Set<ObjectName> expectedLocatorMXBeans(String memberName)
+      throws MalformedObjectNameException {
+    return new HashSet<>(asList(
+        getInstance("GemFire:service=DiskStore,name=cluster_config,type=Member,member=" +
+            memberName),
+        getInstance("GemFire:service=Locator,type=Member,member=" + memberName),
+        getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Member,member=" +
+            memberName),
+        getInstance("GemFire:type=Member,member=" + memberName),
+        getInstance("GemFire:service=Manager,type=Member,member=" + memberName)));
+  }
+
+  private static Set<ObjectName> expectedDistributedMXBeans(String regionName)
+      throws MalformedObjectNameException {
+    return new HashSet<>(asList(
+        getInstance("GemFire:service=AccessControl,type=Distributed"),
+        getInstance("GemFire:service=FileUploader,type=Distributed"),
+        getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Distributed"),
+        getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"),
+        getInstance("GemFire:service=System,type=Distributed")));
+  }
+
+  private static <V> V execute(Callable<V> task) {
+    try {
+      return task.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
similarity index 63%
rename from geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
rename to geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
index 3969f07..ab1b46c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
@@ -32,14 +32,12 @@
 import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
-import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMId;
 import static org.apache.geode.test.dunit.VM.toArray;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.Serializable;
@@ -47,11 +45,13 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.QueryExp;
 
 import org.junit.After;
 import org.junit.Before;
@@ -73,6 +73,7 @@
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedReference;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 import org.apache.geode.test.junit.categories.JMXTest;
 import org.apache.geode.test.junit.rules.GfshCommandRule;
@@ -80,61 +81,66 @@
 import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 
 @Category(JMXTest.class)
-@SuppressWarnings("serial")
-public class JMXMBeanReconnectDUnitTest implements Serializable {
+@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"})
+public class JmxServerReconnectDistributedTest implements Serializable {
 
-  private static final long TIMEOUT_MILLIS = getTimeout().toMillis();
-  private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class);
-  private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class);
+  private static final String LOCATOR_1_NAME = "locator1";
+  private static final String LOCATOR_2_NAME = "locator2";
+  private static final String SERVER_NAME = "server";
+  private static final String REGION_NAME = "region";
+  private static final QueryExp QUERY_ALL = null;
+
+  private static final ObjectName GEMFIRE_MXBEANS =
+      execute(() -> getInstance("GemFire:*"));
+  private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS =
+      execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME));
+  private static final Set<ObjectName> EXPECTED_LOCATOR_1_MXBEANS =
+      execute(() -> expectedLocatorMXBeans(LOCATOR_1_NAME));
+  private static final Set<ObjectName> EXPECTED_LOCATOR_2_MXBEANS =
+      execute(() -> expectedLocatorMXBeans(LOCATOR_2_NAME));
+  private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS =
+      execute(() -> expectedDistributedMXBeans(REGION_NAME));
 
   private static final AtomicReference<CountDownLatch> BEFORE =
       new AtomicReference<>(new CountDownLatch(0));
   private static final AtomicReference<CountDownLatch> AFTER =
       new AtomicReference<>(new CountDownLatch(0));
 
-  private static final AtomicReference<LocatorLauncher> LOCATOR =
-      new AtomicReference<>(DUMMY_LOCATOR);
-  private static final AtomicReference<ServerLauncher> SERVER =
-      new AtomicReference<>(DUMMY_SERVER);
-
   private VM locator1VM;
   private VM locator2VM;
   private VM serverVM;
 
-  private String locator1Name;
-  private String locator2Name;
-  private String serverName;
   private String locators;
   private int locator1Port;
   private int locator2Port;
   private int locator1JmxPort;
   private int locator2JmxPort;
-  private String regionName;
   private Set<ObjectName> mxbeansOnServer;
   private Set<ObjectName> mxbeansOnLocator1;
   private Set<ObjectName> mxbeansOnLocator2;
 
   @Rule
-  public DistributedRule distributedRule = new DistributedRule();
-  @Rule
-  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+  public DistributedRule distributedRule = new DistributedRule(3);
   @Rule
   public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
   @Rule
+  public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>();
+  @Rule
+  public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>();
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+  @Rule
   public transient GfshCommandRule gfsh = new GfshCommandRule();
 
   @Before
   public void setUp() throws Exception {
-    locator1VM = getVM(1);
-    locator2VM = getVM(-1);
-    serverVM = getVM(0);
+    locator1VM = getVM(0);
+    locator2VM = getVM(1);
+    serverVM = getVM(2);
 
-    locator1Name = "locator1";
-    locator2Name = "locator2";
-    serverName = "server1";
-    File locator1Dir = temporaryFolder.newFolder(locator1Name);
-    File locator2Dir = temporaryFolder.newFolder(locator2Name);
-    File serverDir = temporaryFolder.newFolder(serverName);
+    File locator1Dir = temporaryFolder.newFolder(LOCATOR_1_NAME);
+    File locator2Dir = temporaryFolder.newFolder(LOCATOR_2_NAME);
+    File serverDir = temporaryFolder.newFolder(SERVER_NAME);
 
     int[] port = getRandomAvailableTCPPorts(4);
     locator1Port = port[0];
@@ -144,18 +150,19 @@
     locators = "localhost[" + locator1Port + "],localhost[" + locator2Port + "]";
 
     locator1VM.invoke(() -> {
-      startLocator(locator1Name, locator1Dir, locator1Port, locator1JmxPort, locators);
+      locatorLauncher.set(
+          startLocator(LOCATOR_1_NAME, locator1Dir, locator1Port, locator1JmxPort, locators));
     });
     locator2VM.invoke(() -> {
-      startLocator(locator2Name, locator2Dir, locator2Port, locator2JmxPort, locators);
+      locatorLauncher.set(
+          startLocator(LOCATOR_2_NAME, locator2Dir, locator2Port, locator2JmxPort, locators));
     });
 
-    serverVM.invoke(() -> startServer(serverName, serverDir, locators));
+    serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators)));
 
     gfsh.connectAndVerify(locator1JmxPort, PortType.jmxManager);
 
-    regionName = "region1";
-    String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + regionName;
+    String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME;
     gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();
 
     addIgnoredException(AlertingIOException.class);
@@ -168,37 +175,35 @@
 
     mxbeansOnServer = serverVM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
-            .containsAll(expectedServerMXBeans(serverName, regionName));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
+            .containsAll(EXPECTED_SERVER_MXBEANS);
       });
-      return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+      return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
     });
 
     mxbeansOnLocator1 = locator1VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
-            .containsAll(expectedServerMXBeans(serverName, regionName))
-            .containsAll(expectedLocatorMXBeans(locator1Name))
-            .containsAll(expectedLocatorMXBeans(locator2Name))
-            .containsAll(expectedDistributedMXBeans(regionName));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
+            .containsAll(EXPECTED_SERVER_MXBEANS)
+            .containsAll(EXPECTED_LOCATOR_1_MXBEANS)
+            .containsAll(EXPECTED_LOCATOR_2_MXBEANS)
+            .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
       });
-
-      return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+      return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
     });
 
     mxbeansOnLocator2 = locator2VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator2")
-            .containsAll(expectedServerMXBeans(serverName, regionName))
-            .containsAll(expectedLocatorMXBeans(locator2Name))
-            .containsAll(expectedLocatorMXBeans(locator1Name))
-            .containsAll(expectedDistributedMXBeans(regionName));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator2")
+            .containsAll(EXPECTED_SERVER_MXBEANS)
+            .containsAll(EXPECTED_LOCATOR_2_MXBEANS)
+            .containsAll(EXPECTED_LOCATOR_1_MXBEANS)
+            .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
       });
-
-      return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+      return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
     });
   }
 
@@ -207,19 +212,16 @@
     invokeInEveryVM(() -> {
       BEFORE.get().countDown();
       AFTER.get().countDown();
-      SERVER.getAndSet(DUMMY_SERVER).stop();
-      LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
     });
-    disconnectAllFromDS();
   }
 
   @Test
   public void serverHasMemberTypeMXBeans() {
     serverVM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
-            .containsAll(expectedServerMXBeans(serverName, regionName));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
+            .containsAll(EXPECTED_SERVER_MXBEANS);
       });
     });
   }
@@ -229,9 +231,9 @@
     for (VM locatorVM : toArray(locator1VM, locator2VM)) {
       locatorVM.invoke(() -> {
         await().untilAsserted(() -> {
-          assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-              .as("GemFire mbeans on server1")
-              .containsAll(expectedServerMXBeans(serverName, regionName));
+          assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+              .as("GemFire mxbeans on server")
+              .containsAll(EXPECTED_SERVER_MXBEANS);
         });
       });
     }
@@ -241,17 +243,17 @@
   public void locatorHasMemberTypeMXBeansForBothLocators() {
     locator1VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
-            .containsAll(expectedLocatorMXBeans(locator1Name));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
+            .containsAll(EXPECTED_LOCATOR_1_MXBEANS);
       });
     });
 
     locator2VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator2")
-            .containsAll(expectedLocatorMXBeans(locator2Name));
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator2")
+            .containsAll(EXPECTED_LOCATOR_2_MXBEANS);
       });
     });
   }
@@ -261,9 +263,9 @@
     for (VM locatorVM : toArray(locator1VM, locator2VM)) {
       locatorVM.invoke(() -> {
         await().untilAsserted(() -> {
-          assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-              .as("GemFire mbeans on locator" + getVMId())
-              .containsAll(expectedDistributedMXBeans(regionName));
+          assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+              .as("GemFire mxbeans on locator" + getVMId())
+              .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
         });
       });
     }
@@ -275,29 +277,30 @@
   @Test
   public void serverMXBeansOnServerAreUnaffectedByLocatorCrash() {
     locator1VM.invoke(() -> {
-      crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem());
+      crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .isEmpty();
       });
     });
 
     serverVM.invoke(() -> {
-      assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-          .as("GemFire mbeans on server1")
-          .containsExactlyElementsOf(mxbeansOnServer);
+      assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+          .as("GemFire mxbeans on server")
+          .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
     });
 
     locator1VM.invoke(() -> {
-      InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
+      InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
 
       await().untilAsserted(() -> {
         boolean isReconnected = locator.isReconnected();
         boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning();
         Set<ObjectName> mbeanNames =
-            getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+            getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+
         assertThat(isReconnected)
             .as("Locator is reconnected on locator1")
             .isTrue();
@@ -305,15 +308,15 @@
             .as("Locator shared configuration is running on locator1")
             .isTrue();
         assertThat(mbeanNames)
-            .as("GemFire mbeans on locator1")
+            .as("GemFire mxbeans on locator1")
             .isEqualTo(mxbeansOnLocator1);
       });
     });
 
     serverVM.invoke(() -> {
-      assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-          .as("GemFire mbeans on server1")
-          .containsExactlyElementsOf(mxbeansOnServer);
+      assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+          .as("GemFire mxbeans on server")
+          .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
     });
   }
 
@@ -323,48 +326,48 @@
   @Test
   public void serverMXBeansOnLocatorAreRestoredAfterCrashedServerReturns() {
     serverVM.invoke(() -> {
-      crashDistributedSystem(SERVER.get().getCache().getDistributedSystem());
+      crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem());
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
             .isEmpty();
       });
     });
 
     locator1VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .doesNotContainAnyElementsOf(mxbeansOnServer);
       });
     });
 
     serverVM.invoke(() -> {
-      InternalCache cache = (InternalCache) SERVER.get().getCache();
+      InternalCache cache = (InternalCache) serverLauncher.get().getCache();
       InternalDistributedSystem system = cache.getInternalDistributedSystem();
 
       await().untilAsserted(() -> {
         assertThat(system.isReconnecting())
-            .as("System is reconnecting on server1")
+            .as("System is reconnecting on server")
             .isTrue();
       });
 
-      system.waitUntilReconnected(TIMEOUT_MILLIS, MILLISECONDS);
+      system.waitUntilReconnected(getTimeout().toMillis(), MILLISECONDS);
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
-            .containsExactlyElementsOf(mxbeansOnServer);
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
+            .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
       });
     });
 
     locator1VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .containsAll(mxbeansOnServer)
-            .containsExactlyElementsOf(mxbeansOnLocator1);
+            .containsExactlyInAnyOrderElementsOf(mxbeansOnLocator1);
       });
     });
   }
@@ -384,31 +387,31 @@
         @Override
         public void reconnecting(InternalDistributedSystem oldSystem) {
           try {
-            BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+            BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS);
           } catch (InterruptedException e) {
             errorCollector.addError(e);
           }
         }
       });
 
-      crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem());
+      crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .isEmpty();
       });
     });
 
     locator2VM.invoke(() -> {
       Collection<ObjectName> locator1MXBeans = new ArrayList<>(mxbeansOnLocator1);
-      locator1MXBeans.removeAll(expectedServerMXBeans(serverName, regionName));
-      locator1MXBeans.removeAll(expectedLocatorMXBeans(locator2Name));
-      locator1MXBeans.removeAll(expectedDistributedMXBeans(regionName));
+      locator1MXBeans.removeAll(EXPECTED_SERVER_MXBEANS);
+      locator1MXBeans.removeAll(EXPECTED_LOCATOR_2_MXBEANS);
+      locator1MXBeans.removeAll(EXPECTED_DISTRIBUTED_MXBEANS);
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator2")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator2")
             .doesNotContainAnyElementsOf(locator1MXBeans);
       });
     });
@@ -416,14 +419,14 @@
     locator1VM.invoke(() -> {
       BEFORE.get().countDown();
 
-      await().untilAsserted(() -> {
-        InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
+      InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
 
+      await().untilAsserted(() -> {
         assertThat(locator.isSharedConfigurationRunning())
             .as("Locator shared configuration is running on locator1")
             .isTrue();
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .containsAll(mxbeansOnLocator1);
       });
     });
@@ -445,7 +448,7 @@
         @Override
         public void reconnecting(InternalDistributedSystem oldSystem) {
           try {
-            BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+            BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS);
           } catch (InterruptedException e) {
             errorCollector.addError(e);
           }
@@ -458,11 +461,11 @@
         }
       });
 
-      crashDistributedSystem(SERVER.get().getCache().getDistributedSystem());
+      crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem());
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
             .isEmpty();
       });
     });
@@ -470,8 +473,8 @@
     for (VM locatorVM : toArray(locator1VM, locator2VM)) {
       locatorVM.invoke(() -> {
         await().untilAsserted(() -> {
-          assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-              .as("GemFire mbeans on locator" + locatorVM.getId())
+          assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+              .as("GemFire mxbeans on locator" + locatorVM.getId())
               .isNotEmpty()
               .doesNotContainAnyElementsOf(mxbeansOnServer);
         });
@@ -480,35 +483,35 @@
 
     serverVM.invoke(() -> {
       BEFORE.get().countDown();
-      AFTER.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+      AFTER.get().await(getTimeout().toMillis(), MILLISECONDS);
 
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on server1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on server")
             .isEqualTo(mxbeansOnServer);
       });
     });
 
     locator1VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator1")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator1")
             .containsAll(mxbeansOnLocator1);
       });
     });
 
     locator2VM.invoke(() -> {
       await().untilAsserted(() -> {
-        assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
-            .as("GemFire mbeans on locator2")
+        assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+            .as("GemFire mxbeans on locator2")
             .containsAll(mxbeansOnLocator2);
       });
     });
   }
 
-  private static void startLocator(String name, File workingDirectory, int locatorPort, int jmxPort,
-      String locators) {
-    LOCATOR.set(new LocatorLauncher.Builder()
+  private static LocatorLauncher startLocator(String name, File workingDirectory, int locatorPort,
+      int jmxPort, String locators) {
+    LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
         .setMemberName(name)
         .setPort(locatorPort)
         .setWorkingDirectory(workingDirectory.getAbsolutePath())
@@ -520,40 +523,44 @@
         .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath())
         .set(MAX_WAIT_TIME_RECONNECT, "1000")
         .set(MEMBER_TIMEOUT, "2000")
-        .build());
+        .build();
 
-    LOCATOR.get().start();
+    locatorLauncher.start();
+
+    InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
 
     await().untilAsserted(() -> {
-      InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
       assertThat(locator.isSharedConfigurationRunning())
           .as("Locator shared configuration is running on locator" + getVMId())
           .isTrue();
     });
+
+    return locatorLauncher;
   }
 
-  private static void startServer(String name, File workingDirectory, String locators) {
-    SERVER.set(new ServerLauncher.Builder()
+  private static ServerLauncher startServer(File workingDirectory, String locators) {
+    ServerLauncher serverLauncher = new ServerLauncher.Builder()
         .setDisableDefaultServer(true)
-        .setMemberName(name)
+        .setMemberName(SERVER_NAME)
         .setWorkingDirectory(workingDirectory.getAbsolutePath())
         .set(HTTP_SERVICE_PORT, "0")
         .set(LOCATORS, locators)
-        .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath())
+        .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath())
         .set(MAX_WAIT_TIME_RECONNECT, "1000")
         .set(MEMBER_TIMEOUT, "2000")
-        .build());
+        .build();
 
-    SERVER.get().start();
+    serverLauncher.start();
+
+    return serverLauncher;
   }
 
   private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName)
       throws MalformedObjectNameException {
     return new HashSet<>(asList(
         getInstance("GemFire:type=Member,member=" + memberName),
-        getInstance(
-            "GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Member,member=" +
-                memberName)));
+        getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName +
+            ",type=Member,member=" + memberName)));
   }
 
   private static Set<ObjectName> expectedLocatorMXBeans(String memberName)
@@ -577,4 +584,12 @@
         getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"),
         getInstance("GemFire:service=System,type=Distributed")));
   }
+
+  private static <V> V execute(Callable<V> task) {
+    try {
+      return task.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
index 36cc35d..60faef5 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
@@ -178,7 +178,7 @@
         (SystemManagementService) ManagementService.getManagementService(cache);
     service.startManager();
     FederatingManager federatingManager = service.getFederatingManager();
-    proxyFactory = federatingManager.getProxyFactory();
+    proxyFactory = federatingManager.proxyFactory();
 
     return locatorLauncher.getPort();
   }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
index 2efe96e..d9dc552 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
@@ -82,7 +82,7 @@
 
     await().until(() -> !cache.getAllRegions().isEmpty());
 
-    assertThat(federatingManager.getAndResetLatestException()).isNull();
+    assertThat(federatingManager.latestException()).isNull();
   }
 
   private InternalDistributedMember member() throws UnknownHostException {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
deleted file mode 100644
index db6b2dd..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.quality.Strictness.STRICT_STUBS;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.Executors;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-import org.apache.geode.StatisticsFactory;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.InternalCacheForClientAccess;
-import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.test.junit.categories.JMXTest;
-
-@Category(JMXTest.class)
-public class FederatingManagerIntegrationTest {
-  @Rule
-  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
-
-  @Mock
-  public InternalCache cache;
-  @Mock
-  public InternalCacheForClientAccess cacheForClientAccess;
-  @Mock
-  public MBeanProxyFactory proxyFactory;
-  @Mock
-  public MemberMessenger messenger;
-  @Mock
-  public ManagementResourceRepo repo;
-  @Mock
-  public SystemManagementService service;
-  @Mock
-  public StatisticsFactory statisticsFactory;
-  @Mock
-  public StatisticsClock statisticsClock;
-  @Mock
-  public InternalDistributedSystem system;
-  @Mock
-  public DistributionManager distributionManager;
-
-  @Before
-  public void setUp() throws IOException, ClassNotFoundException {
-    when(cache.getCacheForProcessingClientRequests())
-        .thenReturn(cacheForClientAccess);
-    when(system.getDistributionManager())
-        .thenReturn(distributionManager);
-  }
-
-  @Test
-  public void restartDoesNotThrowIfOtherMembersExist() {
-    when(distributionManager.getOtherDistributionManagerIds())
-        .thenReturn(Collections.singleton(mock(InternalDistributedMember.class)));
-
-    FederatingManager federatingManager =
-        new FederatingManager(repo, system, service, cache, statisticsFactory,
-            statisticsClock, proxyFactory, messenger, Executors::newSingleThreadExecutor);
-
-    federatingManager.startManager();
-    federatingManager.stopManager();
-
-    assertThatCode(federatingManager::startManager)
-        .doesNotThrowAnyException();
-  }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
index 1af2974..cadbe26 100644
--- a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
+++ b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
@@ -35,8 +35,8 @@
  */
 public class LoggingSession implements InternalSessionContext {
 
-  static final boolean STANDARD_OUTPUT_ALWAYS_ON =
-      Boolean.valueOf(System.getProperty(GEMFIRE_PREFIX + "standard-output-always-on", "false"));
+  private static final boolean STANDARD_OUTPUT_ALWAYS_ON =
+      Boolean.getBoolean(GEMFIRE_PREFIX + "standard-output-always-on");
 
   private static final Logger logger = LogService.getLogger();
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index 714c9cb..3ba7c21 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -20,11 +20,13 @@
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
 import javax.management.Notification;
@@ -63,23 +65,26 @@
  *
  * @since GemFire 7.0
  */
-public class FederatingManager extends Manager {
+public class FederatingManager extends Manager implements ManagerMembership {
   private static final Logger logger = LogService.getLogger();
 
-  private final SystemManagementService service;
-  private final AtomicReference<Exception> latestException = new AtomicReference<>();
-
-  private final Supplier<ExecutorService> executorServiceSupplier;
-  private final MBeanProxyFactory proxyFactory;
-  private final MemberMessenger messenger;
-
   /**
    * This Executor uses a pool of thread to execute the member addition /removal tasks, This will
    * utilize the processing powers available. Going with unbounded queue because tasks wont be
    * unbounded in practical situation as number of members will be a finite set at any given point
    * of time
    */
-  private ExecutorService executorService;
+  private final AtomicReference<ExecutorService> executorService = new AtomicReference<>();
+  private final AtomicReference<Exception> latestException = new AtomicReference<>();
+  private final List<Runnable> pendingTasks = new CopyOnWriteArrayList<>();
+
+  private final SystemManagementService service;
+  private final Supplier<ExecutorService> executorServiceSupplier;
+  private final MBeanProxyFactory proxyFactory;
+  private final MemberMessenger messenger;
+  private final ReentrantLock lifecycleLock;
+
+  private volatile boolean starting;
 
   @VisibleForTesting
   FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
@@ -99,6 +104,7 @@
     this.proxyFactory = proxyFactory;
     this.messenger = messenger;
     this.executorServiceSupplier = executorServiceSupplier;
+    lifecycleLock = new ReentrantLock();
   }
 
   /**
@@ -108,31 +114,69 @@
   @Override
   public synchronized void startManager() {
     try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Starting the Federating Manager.... ");
+      lifecycleLock.lock();
+      try {
+        if (starting || running) {
+          return;
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Starting the Federating Manager.... ");
+        }
+        starting = true;
+        executorService.set(executorServiceSupplier.get());
+        running = true;
+      } finally {
+        lifecycleLock.unlock();
       }
 
-      executorService = executorServiceSupplier.get();
-
-      running = true;
       startManagingActivity();
+
+      lifecycleLock.lock();
+      try {
+        for (Runnable task : pendingTasks) {
+          executeTask(task);
+        }
+      } finally {
+        pendingTasks.clear();
+        starting = false;
+        lifecycleLock.unlock();
+      }
+
       messenger.broadcastManagerInfo();
+
     } catch (Exception e) {
-      running = false;
+      cleanupFailedStart();
       throw new ManagementException(e);
     }
   }
 
+  private void cleanupFailedStart() {
+    lifecycleLock.lock();
+    try {
+      pendingTasks.clear();
+      running = false;
+      starting = false;
+    } finally {
+      lifecycleLock.unlock();
+    }
+  }
+
   @Override
   public synchronized void stopManager() {
-    // remove hidden management regions and federatedMBeans
-    if (!running) {
-      return;
+    lifecycleLock.lock();
+    try {
+      // remove hidden management regions and federatedMBeans
+      if (!running) {
+        return;
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("Stopping the Federating Manager.... ");
+      }
+      running = false;
+    } finally {
+      lifecycleLock.unlock();
     }
-    running = false;
-    if (logger.isDebugEnabled()) {
-      logger.debug("Stopping the Federating Manager.... ");
-    }
+
     stopManagingActivity();
   }
 
@@ -141,35 +185,56 @@
     return running;
   }
 
-  public MemberMessenger getMessenger() {
-    return messenger;
-  }
-
   /**
-   * This method will be invoked from MembershipListener which is registered when the member becomes
-   * a Management node.
-   *
-   * <p>
    * This method will delegate task to another thread and exit, so that it wont block the membership
    * listener
    */
-  void removeMember(DistributedMember member, boolean crashed) {
-    executeTask(new RemoveMemberTask(member, crashed));
+  @Override
+  public void addMember(InternalDistributedMember member) {
+    lifecycleLock.lock();
+    try {
+      if (!running) {
+        return;
+      }
+      executeTask(() -> new AddMemberTask(member).call());
+    } finally {
+      lifecycleLock.unlock();
+    }
   }
 
   /**
-   * This method will be invoked from MembershipListener which is registered when the member becomes
-   * a Management node.
-   *
-   * <p>
+   * This method will delegate task to another thread and exit, so that it wont block the membership
+   * listener
+   */
+  @Override
+  public void removeMember(DistributedMember member, boolean crashed) {
+    lifecycleLock.lock();
+    try {
+      Runnable task = new RemoveMemberTask(member, crashed);
+      if (starting) {
+        pendingTasks.add(task);
+      } else if (running) {
+        executeTask(task);
+      }
+    } finally {
+      lifecycleLock.unlock();
+    }
+  }
+
+  /**
    * this method will delegate task to another thread and exit, so that it wont block the membership
    * listener
    */
-  void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
+  @Override
+  public void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
       String reason) {
     service.memberSuspect((InternalDistributedMember) member, whoSuspected, reason);
   }
 
+  public MemberMessenger getMessenger() {
+    return messenger;
+  }
+
   /**
    * This will return the last updated time of the proxyMBean.
    *
@@ -207,30 +272,6 @@
   }
 
   /**
-   * This method will be invoked whenever a member stops being a managing node. The
-   * {@code ManagementException} has to be handled by the caller.
-   */
-  private void stopManagingActivity() {
-    try {
-      executorService.shutdownNow();
-
-      for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
-        removeMemberArtifacts(distributedMember, false);
-      }
-    } catch (Exception e) {
-      throw new ManagementException(e);
-    }
-  }
-
-  private synchronized void executeTask(Runnable task) {
-    try {
-      executorService.execute(task);
-    } catch (RejectedExecutionException ignored) {
-      // Ignore, we are getting shutdown
-    }
-  }
-
-  /**
    * This method will be invoked when a node transitions from managed node to managing node This
    * method will block for all GIIs to be completed But each GII is given a specific time frame.
    * After that the task will be marked as cancelled.
@@ -242,7 +283,7 @@
 
     for (InternalDistributedMember member : system.getDistributionManager()
         .getOtherDistributionManagerIds()) {
-      giiTaskList.add(new GIITask(member));
+      giiTaskList.add(new AddMemberTask(member));
     }
 
     try {
@@ -250,7 +291,7 @@
         logger.debug("Management Resource creation started  : ");
       }
       List<Future<InternalDistributedMember>> futureTaskList =
-          executorService.invokeAll(giiTaskList);
+          executorService.get().invokeAll(giiTaskList);
 
       for (Future<InternalDistributedMember> futureTask : futureTaskList) {
         try {
@@ -296,76 +337,27 @@
   }
 
   /**
-   * This method will be invoked from MembershipListener which is registered when the member becomes
-   * a Management node.
-   *
-   * <p>
-   * This method will delegate task to another thread and exit, so that it wont block the membership
-   * listener
+   * This method will be invoked whenever a member stops being a managing node. The
+   * {@code ManagementException} has to be handled by the caller.
    */
-  @VisibleForTesting
-  void addMember(InternalDistributedMember member) {
-    GIITask giiTask = new GIITask(member);
-    executeTask(() -> {
-      try {
-        giiTask.call();
-      } catch (RuntimeException e) {
-        logger.warn("Error federating new member {}", member.getId(), e);
-        latestException.set(e);
+  private void stopManagingActivity() {
+    try {
+      executorService.get().shutdownNow();
+
+      for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
+        removeMemberArtifacts(distributedMember, false);
       }
-    });
-  }
-
-  @VisibleForTesting
-  void removeMemberArtifacts(DistributedMember member, boolean crashed) {
-    Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
-    Region<NotificationKey, Notification> notificationRegion =
-        repo.getEntryFromNotifRegionMap(member);
-
-    if (monitoringRegion == null && notificationRegion == null) {
-      return;
-    }
-
-    repo.romoveEntryFromMonitoringRegionMap(member);
-    repo.removeEntryFromNotifRegionMap(member);
-
-    // If cache is closed all the regions would have been destroyed implicitly
-    if (!cache.isClosed()) {
-      try {
-        if (monitoringRegion != null) {
-          proxyFactory.removeAllProxies(member, monitoringRegion);
-          monitoringRegion.localDestroyRegion();
-        }
-      } catch (CancelException | RegionDestroyedException ignore) {
-        // ignored
-      }
-
-      try {
-        if (notificationRegion != null) {
-          notificationRegion.localDestroyRegion();
-        }
-      } catch (CancelException | RegionDestroyedException ignore) {
-        // ignored
-      }
-    }
-
-    if (!system.getDistributedMember().equals(member)) {
-      try {
-        service.memberDeparted((InternalDistributedMember) member, crashed);
-      } catch (CancelException | RegionDestroyedException ignore) {
-        // ignored
-      }
+    } catch (Exception e) {
+      throw new ManagementException(e);
     }
   }
 
-  @VisibleForTesting
-  public MBeanProxyFactory getProxyFactory() {
-    return proxyFactory;
-  }
-
-  @VisibleForTesting
-  synchronized Exception getAndResetLatestException() {
-    return latestException.getAndSet(null);
+  private synchronized void executeTask(Runnable task) {
+    try {
+      executorService.get().execute(task);
+    } catch (RejectedExecutionException ignored) {
+      // Ignore, we are getting shutdown
+    }
   }
 
   @VisibleForTesting
@@ -503,19 +495,80 @@
     }
   }
 
+  @VisibleForTesting
+  void removeMemberArtifacts(DistributedMember member, boolean crashed) {
+    Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
+    Region<NotificationKey, Notification> notificationRegion =
+        repo.getEntryFromNotifRegionMap(member);
+
+    if (monitoringRegion == null && notificationRegion == null) {
+      return;
+    }
+
+    repo.romoveEntryFromMonitoringRegionMap(member);
+    repo.removeEntryFromNotifRegionMap(member);
+
+    // If cache is closed all the regions would have been destroyed implicitly
+    if (!cache.isClosed()) {
+      try {
+        if (monitoringRegion != null) {
+          proxyFactory.removeAllProxies(member, monitoringRegion);
+          monitoringRegion.localDestroyRegion();
+        }
+      } catch (CancelException | RegionDestroyedException ignore) {
+        // ignored
+      }
+
+      try {
+        if (notificationRegion != null) {
+          notificationRegion.localDestroyRegion();
+        }
+      } catch (CancelException | RegionDestroyedException ignore) {
+        // ignored
+      }
+    }
+
+    if (!system.getDistributedMember().equals(member)) {
+      try {
+        service.memberDeparted((InternalDistributedMember) member, crashed);
+      } catch (CancelException | RegionDestroyedException ignore) {
+        // ignored
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public MBeanProxyFactory proxyFactory() {
+    return proxyFactory;
+  }
+
+  @VisibleForTesting
+  Exception latestException() {
+    return latestException.getAndSet(null);
+  }
+
+  @VisibleForTesting
+  List<Runnable> pendingTasks() {
+    return pendingTasks;
+  }
+
+  @VisibleForTesting
+  boolean isStarting() {
+    return starting;
+  }
+
   /**
-   * Actual task of doing the GII
+   * Actual task of adding a member.
    *
    * <p>
-   * It will perform the GII request which might originate from TransitionListener or Membership
-   * Listener.
+   * Perform the GII request which might originate from transition listener or membership listener.
    *
    * <p>
-   * Managing Node side resources are created per member which is visible to this node:
+   * Manager resources are created per member which is visible to this node:
    *
    * <pre>
-   * 1)Management Region : its a Replicated NO_ACK region
-   * 2)Notification Region : its a Replicated Proxy NO_ACK region
+   * 1) Management Region : its a Replicated NO_ACK region
+   * 2) Notification Region : its a Replicated Proxy NO_ACK region
    * </pre>
    *
    * <p>
@@ -528,13 +581,13 @@
    *
    * <p>
    * This task can be cancelled from the calling thread if a timeout happens. In that case we have
-   * to handle the thread interrupt
+   * to handle the thread interrupt.
    */
-  private class GIITask implements Callable<InternalDistributedMember> {
+  private class AddMemberTask implements Callable<InternalDistributedMember> {
 
     private final InternalDistributedMember member;
 
-    private GIITask(InternalDistributedMember member) {
+    private AddMemberTask(InternalDistributedMember member) {
       this.member = member;
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
index b67d8f5..b28af5b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
@@ -14,12 +14,15 @@
  */
 package org.apache.geode.management.internal;
 
+import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.MalformedObjectNameException;
 import javax.management.Notification;
@@ -43,7 +46,6 @@
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionFactory;
 import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.management.ManagementException;
 
@@ -61,13 +63,14 @@
   /**
    * Management Task pushes data to the admin regions
    */
-  private ManagementTask managementTask;
+  private final AtomicReference<ManagementTask> managementTask = new AtomicReference<>();
 
   /**
    * This service will be responsible for executing ManagementTasks and periodically push data to
    * localMonitoringRegion
    */
-  private ScheduledExecutorService singleThreadFederationScheduler;
+  private final AtomicReference<ScheduledExecutorService> singleThreadFederationScheduler =
+      new AtomicReference<>();
 
   /**
    * This map holds all the components which are eligible for federation. Although filters might
@@ -103,8 +106,7 @@
       if (repo.getLocalMonitoringRegion() != null) {
         return;
       }
-      singleThreadFederationScheduler =
-          LoggingExecutors.newSingleThreadScheduledExecutor("Management Task");
+      singleThreadFederationScheduler.set(newSingleThreadScheduledExecutor("Management Task"));
 
       if (logger.isDebugEnabled()) {
         logger.debug("Creating  Management Region :");
@@ -165,14 +167,14 @@
         }
       }
 
-      managementTask = new ManagementTask();
+      managementTask.set(new ManagementTask());
       // call run to get us initialized immediately with a sync call
-      managementTask.run();
+      managementTask.get().run();
       // All local resources are created for the ManagementTask
       // Now Management tasks can proceed.
       int updateRate = system.getConfig().getJmxManagerUpdateRate();
-      singleThreadFederationScheduler.scheduleAtFixedRate(managementTask, updateRate, updateRate,
-          TimeUnit.MILLISECONDS);
+      singleThreadFederationScheduler.get().scheduleAtFixedRate(managementTask.get(), updateRate,
+          updateRate, TimeUnit.MILLISECONDS);
 
       if (logger.isDebugEnabled()) {
         logger.debug("Management Region created with Name : {}",
@@ -206,8 +208,9 @@
   private void shutdownTasks() {
     // No need of pooledGIIExecutor as this node wont do GII again
     // so better to release resources
-    if (singleThreadFederationScheduler != null) {
-      singleThreadFederationScheduler.shutdownNow();
+    ScheduledExecutorService executor = singleThreadFederationScheduler.get();
+    if (executor != null) {
+      executor.shutdownNow();
     }
   }
 
@@ -246,7 +249,7 @@
    */
   @VisibleForTesting
   public ScheduledExecutorService getFederationScheduler() {
-    return singleThreadFederationScheduler;
+    return singleThreadFederationScheduler.get();
   }
 
   /**
@@ -255,7 +258,7 @@
    */
   @VisibleForTesting
   public void runManagementTaskAdhoc() {
-    managementTask.run();
+    managementTask.get().run();
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
index 7a48465..2adcb47 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
@@ -28,21 +28,11 @@
  *
  * @since GemFire 7.0
  */
-public abstract class Manager {
+public abstract class Manager implements ManagerLifecycle {
 
   protected final InternalCacheForClientAccess cache;
 
   /**
-   * depicts whether this node is a Managing node or not
-   */
-  protected volatile boolean running;
-
-  /**
-   * depicts whether this node is a Managing node or not
-   */
-  protected volatile boolean stopCacheOps;
-
-  /**
    * This is a single window to manipulate region resources for management
    */
   protected final ManagementResourceRepo repo;
@@ -56,8 +46,15 @@
 
   protected final StatisticsClock statisticsClock;
 
-  public Manager(ManagementResourceRepo repo, InternalDistributedSystem system,
-      InternalCache cache, StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
+  /**
+   * True if this node is a Geode JMX manager.
+   */
+  protected volatile boolean running;
+
+  protected volatile boolean stopCacheOps;
+
+  Manager(ManagementResourceRepo repo, InternalDistributedSystem system, InternalCache cache,
+      StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
     this.repo = repo;
     this.cache = cache.getCacheForProcessingClientRequests();
     this.system = system;
@@ -65,12 +62,6 @@
     this.statisticsClock = statisticsClock;
   }
 
-  public abstract boolean isRunning();
-
-  public abstract void startManager();
-
-  public abstract void stopManager();
-
   @VisibleForTesting
   public ManagementResourceRepo getManagementResourceRepo() {
     return repo;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java
new file mode 100644
index 0000000..0ae8382
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+/**
+ * Lifecycle operations for Geode JMX managers.
+ */
+public interface ManagerLifecycle {
+
+  /**
+   * Start the manager.
+   */
+  void startManager();
+
+  /**
+   * Stop the manager.
+   */
+  void stopManager();
+
+  /**
+   * Returns true if the manager is running.
+   */
+  boolean isRunning();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java
new file mode 100644
index 0000000..845ad55
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+/**
+ * Membership operations for Geode JMX managers.
+ */
+public interface ManagerMembership {
+
+  /**
+   * This method will be invoked from MembershipListener which is registered when the member becomes
+   * a Management node.
+   */
+  void addMember(InternalDistributedMember member);
+
+  /**
+   * This method will be invoked from MembershipListener which is registered when the member becomes
+   * a Management node.
+   */
+  void removeMember(DistributedMember member, boolean crashed);
+
+  /**
+   * This method will be invoked from MembershipListener which is registered when the member becomes
+   * a Management node.
+   */
+  void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
+      String reason);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
index 27588c2..10a2f75 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -283,6 +284,8 @@
     }
 
     if (mapOfMembers != null) {
+      Objects.requireNonNull(objectName);
+      Objects.requireNonNull(proxy);
       mapOfMembers.put(objectName, proxy);
       memberSetSize = mapOfMembers.values().size();
 
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
index 2de1c51..3fda782 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
@@ -14,23 +14,37 @@
  */
 package org.apache.geode.management.internal;
 
+import static java.util.Collections.singleton;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentCaptor.forClass;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
 
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ErrorCollector;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.geode.StatisticsFactory;
@@ -48,17 +62,17 @@
 import org.apache.geode.internal.cache.InternalRegionFactory;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.management.ManagementException;
 import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 @Category(JMXTest.class)
 public class FederatingManagerTest {
 
   private InternalCache cache;
-  private InternalCacheForClientAccess cacheForClientAccess;
   private ExecutorService executorService;
-  private MBeanJMXAdapter jmxAdapter;
-  private MBeanProxyFactory proxyFactory;
   private MemberMessenger messenger;
+  private MBeanProxyFactory proxyFactory;
   private ManagementResourceRepo repo;
   private SystemManagementService service;
   private StatisticsFactory statisticsFactory;
@@ -67,12 +81,15 @@
   private InternalRegionFactory regionFactory1;
   private InternalRegionFactory regionFactory2;
 
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     cache = mock(InternalCache.class);
-    cacheForClientAccess = mock(InternalCacheForClientAccess.class);
     executorService = mock(ExecutorService.class);
-    jmxAdapter = mock(MBeanJMXAdapter.class);
     messenger = mock(MemberMessenger.class);
     proxyFactory = mock(MBeanProxyFactory.class);
     repo = mock(ManagementResourceRepo.class);
@@ -83,14 +100,19 @@
     regionFactory1 = mock(InternalRegionFactory.class);
     regionFactory2 = mock(InternalRegionFactory.class);
 
+    InternalCacheForClientAccess cacheForClientAccess = mock(InternalCacheForClientAccess.class);
     DistributedSystemMXBean distributedSystemMXBean = mock(DistributedSystemMXBean.class);
+    MBeanJMXAdapter jmxAdapter = mock(MBeanJMXAdapter.class);
 
     when(cache.getCacheForProcessingClientRequests())
         .thenReturn(cacheForClientAccess);
-    when(cacheForClientAccess.createInternalRegionFactory()).thenReturn(regionFactory1)
+    when(cacheForClientAccess.createInternalRegionFactory())
+        .thenReturn(regionFactory1)
         .thenReturn(regionFactory2);
-    when(regionFactory1.create(any())).thenReturn(mock(Region.class));
-    when(regionFactory2.create(any())).thenReturn(mock(Region.class));
+    when(regionFactory1.create(any()))
+        .thenReturn(mock(Region.class));
+    when(regionFactory2.create(any()))
+        .thenReturn(mock(Region.class));
     when(distributedSystemMXBean.getAlertLevel())
         .thenReturn(AlertLevel.WARNING.name());
     when(jmxAdapter.getDistributedSystemMXBean())
@@ -100,9 +122,10 @@
   }
 
   @Test
-  public void addMemberArtifactsCreatesMonitoringRegion() throws Exception {
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+  public void addMemberArtifactsCreatesMonitoringRegion() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
     federatingManager.startManager();
 
     federatingManager.addMemberArtifacts(member(1, 20));
@@ -111,23 +134,24 @@
   }
 
   @Test
-  public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() throws Exception {
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+  public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
     federatingManager.startManager();
 
     federatingManager.addMemberArtifacts(member(2, 40));
 
-    ArgumentCaptor<HasCachePerfStats> captor =
-        ArgumentCaptor.forClass(HasCachePerfStats.class);
+    ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class);
     verify(regionFactory1).setCachePerfStatsHolder(captor.capture());
     assertThat(captor.getValue().hasOwnStats()).isTrue();
   }
 
   @Test
   public void addMemberArtifactsCreatesNotificationRegion() {
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
     federatingManager.startManager();
 
     federatingManager.addMemberArtifacts(member(3, 60));
@@ -137,14 +161,14 @@
 
   @Test
   public void addMemberArtifactsCreatesNotificationRegionWithHasOwnStats() {
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
     federatingManager.startManager();
 
     federatingManager.addMemberArtifacts(member(4, 80));
 
-    ArgumentCaptor<HasCachePerfStats> captor =
-        ArgumentCaptor.forClass(HasCachePerfStats.class);
+    ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class);
     verify(regionFactory2).setCachePerfStatsHolder(captor.capture());
     assertThat(captor.getValue().hasOwnStats()).isTrue();
   }
@@ -159,13 +183,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(monitoringRegion)
-        .localDestroyRegion();
+    verify(monitoringRegion).localDestroyRegion();
   }
 
   @Test
@@ -178,13 +202,13 @@
         .thenReturn(notificationRegion);
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(notificationRegion)
-        .localDestroyRegion();
+    verify(notificationRegion).localDestroyRegion();
   }
 
   @Test
@@ -199,13 +223,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(monitoringRegion)
-        .localDestroyRegion();
+    verify(monitoringRegion).localDestroyRegion();
   }
 
   @Test
@@ -220,13 +244,13 @@
         .thenReturn(notificationRegion);
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(notificationRegion)
-        .localDestroyRegion();
+    verify(notificationRegion).localDestroyRegion();
   }
 
   @Test
@@ -242,13 +266,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(proxyFactory)
-        .removeAllProxies(member, monitoringRegion);
+    verify(proxyFactory).removeAllProxies(member, monitoringRegion);
   }
 
   @Test
@@ -264,13 +288,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(proxyFactory)
-        .removeAllProxies(member, monitoringRegion);
+    verify(proxyFactory).removeAllProxies(member, monitoringRegion);
   }
 
   @Test
@@ -285,13 +309,13 @@
         .thenReturn(notificationRegion);
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(notificationRegion)
-        .localDestroyRegion();
+    verify(notificationRegion).localDestroyRegion();
   }
 
   @Test
@@ -306,13 +330,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(monitoringRegion)
-        .localDestroyRegion();
+    verify(monitoringRegion).localDestroyRegion();
   }
 
   @Test
@@ -328,13 +352,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(proxyFactory)
-        .removeAllProxies(member, monitoringRegion);
+    verify(proxyFactory).removeAllProxies(member, monitoringRegion);
   }
 
   @Test
@@ -349,13 +373,13 @@
         .thenReturn(notificationRegion);
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(notificationRegion)
-        .localDestroyRegion();
+    verify(notificationRegion).localDestroyRegion();
   }
 
   @Test
@@ -370,13 +394,13 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
-    verify(monitoringRegion)
-        .localDestroyRegion();
+    verify(monitoringRegion).localDestroyRegion();
   }
 
   @Test
@@ -388,13 +412,13 @@
         .thenReturn(null);
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false));
 
-    assertThat(thrown)
-        .isNull();
+    assertThat(thrown).isNull();
   }
 
   @Test
@@ -406,22 +430,23 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
 
     Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false));
 
-    assertThat(thrown)
-        .isNull();
+    assertThat(thrown).isNull();
   }
 
   @Test
   public void startManagerGetsNewExecutorServiceFromSupplier() {
-    @SuppressWarnings("unchecked")
     Supplier<ExecutorService> executorServiceSupplier = mock(Supplier.class);
-    when(executorServiceSupplier.get()).thenReturn(mock(ExecutorService.class));
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorServiceSupplier);
+    when(executorServiceSupplier.get())
+        .thenReturn(mock(ExecutorService.class));
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorServiceSupplier);
 
     federatingManager.startManager();
 
@@ -437,23 +462,171 @@
         .thenReturn(mock(Region.class));
     when(system.getDistributedMember())
         .thenReturn(member);
-    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
-        statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory,
+            statisticsClock, proxyFactory, messenger, executorService);
 
     federatingManager.removeMemberArtifacts(member, false);
 
     verifyNoMoreInteractions(proxyFactory);
   }
 
+  @Test
+  public void removeMemberWaitsForStartManager() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    CyclicBarrier barrier = new CyclicBarrier(2);
+    ExecutorService executorService = mock(ExecutorService.class);
+    List<Future<InternalDistributedMember>> futureTaskList = Collections.emptyList();
+
+    when(executorService.invokeAll(any())).thenAnswer(invocation -> {
+      awaitCyclicBarrier(barrier);
+      awaitCountDownLatch(latch);
+      return futureTaskList;
+    });
+
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+
+    executorServiceRule.submit(() -> {
+      federatingManager.startManager();
+    });
+
+    executorServiceRule.submit(() -> {
+      awaitCyclicBarrier(barrier);
+      federatingManager.removeMember(member(), true);
+    });
+
+    await().untilAsserted(() -> {
+      assertThat(federatingManager.pendingTasks()).hasSize(1);
+    });
+
+    latch.countDown();
+
+    await().untilAsserted(() -> {
+      assertThat(federatingManager.pendingTasks()).isEmpty();
+    });
+  }
+
+  @Test
+  public void pendingTasksIsEmptyByDefault() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+
+    assertThat(federatingManager.pendingTasks()).isEmpty();
+  }
+
+  @Test
+  public void restartDoesNotThrowIfOtherMembersExist() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(distributionManager.getOtherDistributionManagerIds())
+        .thenReturn(singleton(mock(InternalDistributedMember.class)));
+    when(system.getDistributionManager())
+        .thenReturn(distributionManager);
+
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, Executors::newSingleThreadExecutor);
+
+    federatingManager.startManager();
+    federatingManager.stopManager();
+
+    assertThatCode(federatingManager::startManager)
+        .doesNotThrowAnyException();
+  }
+
+  @Test
+  public void startManagerThrowsManagementExceptionWithNestedCauseOfFailure() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+    RuntimeException exception = new RuntimeException("startManager failed");
+    doThrow(exception)
+        .when(messenger).broadcastManagerInfo();
+
+    Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+
+    assertThat(thrown)
+        .isInstanceOf(ManagementException.class)
+        .hasCause(exception);
+  }
+
+  @Test
+  public void pendingTasksIsClearIfStartManagerFails() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+    RuntimeException exception = new RuntimeException("startManager failed");
+    doThrow(exception)
+        .when(messenger).broadcastManagerInfo();
+
+    Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+    assertThat(thrown).isNotNull();
+
+    assertThat(federatingManager.pendingTasks()).isEmpty();
+  }
+
+  @Test
+  public void startingIsFalseIfStartManagerFails() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+    RuntimeException exception = new RuntimeException("startManager failed");
+    doThrow(exception)
+        .when(messenger).broadcastManagerInfo();
+
+    Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+    assertThat(thrown).isNotNull();
+
+    assertThat(federatingManager.isStarting()).isFalse();
+  }
+
+  @Test
+  public void runningIsFalseIfStartManagerFails() {
+    FederatingManager federatingManager =
+        new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+            proxyFactory, messenger, executorService);
+    RuntimeException exception = new RuntimeException("startManager failed");
+    doThrow(exception)
+        .when(messenger).broadcastManagerInfo();
+
+    Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+    assertThat(thrown).isNotNull();
+
+    assertThat(federatingManager.isRunning()).isFalse();
+  }
+
+  private void awaitCyclicBarrier(CyclicBarrier barrier) {
+    try {
+      barrier.await(getTimeout().toMillis(), MILLISECONDS);
+    } catch (Exception e) {
+      errorCollector.addError(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void awaitCountDownLatch(CountDownLatch latch) {
+    try {
+      latch.await(getTimeout().toMillis(), MILLISECONDS);
+    } catch (Exception e) {
+      errorCollector.addError(e);
+      throw new RuntimeException(e);
+    }
+  }
+
   private InternalDistributedMember member() {
     return member(1, 1);
   }
 
   private InternalDistributedMember member(int viewId, int port) {
     InternalDistributedMember member = mock(InternalDistributedMember.class);
-    when(member.getInetAddress()).thenReturn(mock(InetAddress.class));
-    when(member.getVmViewId()).thenReturn(viewId);
-    when(member.getMembershipPort()).thenReturn(port);
+    when(member.getInetAddress())
+        .thenReturn(mock(InetAddress.class));
+    when(member.getVmViewId())
+        .thenReturn(viewId);
+    when(member.getMembershipPort())
+        .thenReturn(port);
     return member;
   }
 }