GEODE-7330: Prevent RegionDestroyedException in FederatingManager (#4193)

* GEODE-7330: Prevent RegionDestroyedException in FederatingManager

Cleanup management classes:
* Reduce method and field visibility as much as possible
* Remove unnecessary uses of this
* Reorder fields and methods based on visibility and other modifiers
* Use @VisibleForTesting annotation
* Fixup formatting and variable names

Make all FederatingManager fields final:
* Remove field setters for tests
* Introduce FederatingManagerFactory
* Add FederatingManagerFactory system property for tests to
  SystemManagementService
* Inject all FederatingManager fields via constructor
* Use Geode APIs in MBeanFederationErrorPathDUnitTest

Rename MBeanFederationErrorHandlingDistributedTest:
* Rename MBeanFederationErrorPathDUnitTest as
  MBeanFederationErrorHandlingDistributedTest
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
new file mode 100644
index 0000000..06be411
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.management.internal.SystemManagementService.FEDERATING_MANAGER_FACTORY_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+
+import javax.management.ObjectName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@Category(JMXTest.class)
+public class MBeanFederationErrorHandlingDistributedTest implements Serializable {
+
+  private static final String REGION_NAME = "test-region-1";
+
+  private static LocatorLauncher locatorLauncher;
+  private static ServerLauncher serverLauncher;
+  private static MBeanProxyFactory proxyFactory;
+
+  private ObjectName regionMXBeanName;
+  private String locatorName;
+  private String serverName;
+  private int locatorPort;
+  private VM locatorVM;
+  private VM serverVM;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    locatorName = "locator";
+    serverName = "server";
+    regionMXBeanName =
+        new ObjectName(String.format("GemFire:service=Region,name=\"%s\",type=Member,member=%s",
+            SEPARATOR + REGION_NAME, serverName));
+
+    locatorVM = getController();
+    serverVM = getVM(0);
+
+    locatorPort = locatorVM.invoke(this::startLocator);
+
+    serverVM.invoke(this::startServer);
+  }
+
+  @After
+  public void tearDown() {
+    locatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+        proxyFactory = null;
+      }
+    });
+
+    serverVM.invoke(() -> {
+      if (serverLauncher != null) {
+        serverLauncher.stop();
+        serverLauncher = null;
+      }
+    });
+  }
+
+  @Test
+  public void destroyMBeanBeforeFederationCompletes() {
+    locatorVM.invoke(() -> doAnswer((Answer<Void>) invocation -> {
+      serverVM.invoke(() -> {
+        Region region = serverLauncher.getCache().getRegion(REGION_NAME);
+        region.destroyRegion();
+      });
+
+      Region<String, Object> monitoringRegion = invocation.getArgument(2);
+      monitoringRegion.destroy(regionMXBeanName.toString());
+
+      assertThat(monitoringRegion.get(regionMXBeanName.toString())).isNull();
+
+      try {
+        invocation.callRealMethod();
+      } catch (Exception e) {
+        if (!locatorLauncher.getCache().isClosed()) {
+          errorCollector.addError(e);
+        }
+      }
+
+      return null;
+    })
+        .when(proxyFactory).createProxy(any(), eq(regionMXBeanName), any(), any()));
+
+    serverVM.invoke(() -> {
+      serverLauncher.getCache().createRegionFactory(REPLICATE).create(REGION_NAME);
+    });
+
+    locatorVM.invoke(() -> {
+      await().untilAsserted(
+          () -> verify(proxyFactory).createProxy(any(), eq(regionMXBeanName), any(), any()));
+    });
+  }
+
+  private int startLocator() throws IOException {
+    System.setProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+        FederatingManagerFactoryWithSpy.class.getName());
+
+    locatorLauncher = new LocatorLauncher.Builder()
+        .setMemberName(locatorName)
+        .setPort(0)
+        .setWorkingDirectory(temporaryFolder.newFolder(locatorName).getAbsolutePath())
+        .set(HTTP_SERVICE_PORT, "0")
+        .set(JMX_MANAGER_PORT, "0")
+        .build();
+
+    locatorLauncher.start();
+
+    Cache cache = locatorLauncher.getCache();
+
+    SystemManagementService service =
+        (SystemManagementService) ManagementService.getManagementService(cache);
+    service.startManager();
+    FederatingManager federatingManager = service.getFederatingManager();
+    proxyFactory = federatingManager.getProxyFactory();
+
+    return locatorLauncher.getPort();
+  }
+
+  private void startServer() throws IOException {
+    serverLauncher = new ServerLauncher.Builder()
+        .setDisableDefaultServer(true)
+        .setMemberName(serverName)
+        .setWorkingDirectory(temporaryFolder.newFolder(serverName).getAbsolutePath())
+        .set(HTTP_SERVICE_PORT, "0")
+        .set(LOCATORS, "localHost[" + locatorPort + "]")
+        .build();
+
+    serverLauncher.start();
+  }
+
+  private static class FederatingManagerFactoryWithSpy implements FederatingManagerFactory {
+
+    public FederatingManagerFactoryWithSpy() {
+      // must be public for instantiation by reflection
+    }
+
+    @Override
+    public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+        SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+        StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+        ExecutorService executorService) {
+      return new FederatingManager(repo, system, service, cache, statisticsFactory,
+          statisticsClock, spy(proxyFactory), messenger, executorService);
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
deleted file mode 100644
index f3f17b0..0000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-
-import java.rmi.RemoteException;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.ManagementService;
-import org.apache.geode.test.dunit.internal.InternalBlackboard;
-import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
-import org.apache.geode.test.junit.categories.JMXTest;
-import org.apache.geode.test.junit.rules.LocatorStarterRule;
-
-@Category({JMXTest.class})
-public class MBeanFederationErrorPathDUnitTest {
-  private static final int SERVER_1_VM_INDEX = 1;
-  private static final String REGION_NAME = "test-region-1";
-
-  public MemberVM server1, server2, server3;
-
-  @Rule
-  public LocatorStarterRule locator1 = new LocatorStarterRule();
-
-  @Rule
-  public ClusterStartupRule lsRule = new ClusterStartupRule();
-
-
-  private InternalBlackboard bb;
-
-  @Before
-  public void before() throws Exception {
-    locator1.withJMXManager().startLocator();
-
-    bb = InternalBlackboardImpl.getInstance();
-  }
-
-  @Test
-  public void destroyMBeanBeforeFederationCompletes()
-      throws MalformedObjectNameException, RemoteException {
-    String bbKey = "sync1";
-
-    String beanName = "GemFire:service=Region,name=\"/test-region-1\",type=Member,member=server-1";
-    ObjectName objectName = new ObjectName(beanName);
-
-    InternalCache cache = locator1.getCache();
-    SystemManagementService service =
-        (SystemManagementService) ManagementService.getManagementService(cache);
-    FederatingManager federatingManager = service.getFederatingManager();
-    MBeanProxyFactory mBeanProxyFactory = federatingManager.getProxyFactory();
-    MBeanProxyFactory spy = spy(mBeanProxyFactory);
-    service.getFederatingManager().setProxyFactory(spy);
-
-    Answer answer1 = new Answer<Object>() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        server1.invoke(() -> {
-          InternalCache serverCache = ClusterStartupRule.getCache();
-          Region region = serverCache.getRegionByPath("/" + REGION_NAME);
-          region.destroyRegion();
-        });
-
-        Region<String, Object> monitoringRegion = invocation.getArgument(2);
-        monitoringRegion.destroy(objectName.toString());
-
-        assertThat((monitoringRegion).get(objectName.toString())).isNull();
-
-        try {
-          invocation.callRealMethod();
-        } catch (Exception e) {
-          bb.setMailbox(bbKey, e);
-          return null;
-        }
-        bb.setMailbox(bbKey, "this is fine");
-        return null;
-      }
-    };
-
-    doAnswer(answer1).when(spy).createProxy(any(), eq(objectName), any(), any());
-
-    server1 = lsRule.startServerVM(SERVER_1_VM_INDEX, locator1.getPort());
-
-    server1.invoke(() -> {
-      InternalCache cache1 = ClusterStartupRule.getCache();
-      cache1.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME);
-    });
-
-    await().until(() -> bb.getMailbox(bbKey) != null);
-    Object e = bb.getMailbox("sync1");
-
-    assertThat(e).isNotInstanceOf(NullPointerException.class);
-    assertThat((String) e).contains("this is fine");
-  }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java
deleted file mode 100644
index f5cbaad..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.management;
-
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.management.internal.FederatingManager;
-import org.apache.geode.management.internal.MemberMessenger;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.junit.categories.JMXTest;
-import org.apache.geode.test.junit.rules.ServerStarterRule;
-
-@Category({JMXTest.class})
-public class FederatingManagerIntegrationTest {
-
-  @Rule
-  public ServerStarterRule serverRule = new ServerStarterRule();
-
-  @Test
-  public void testFederatingManagerConcurrency() throws Exception {
-    serverRule.startServer();
-    SystemManagementService service =
-        (SystemManagementService) ManagementService
-            .getExistingManagementService(serverRule.getCache());
-    service.createManager();
-    FederatingManager manager = service.getFederatingManager();
-
-    MemberMessenger messenger = mock(MemberMessenger.class);
-    manager.setMessenger(messenger);
-
-    manager.startManager();
-
-    InternalDistributedMember mockMember = mock(InternalDistributedMember.class);
-    when(mockMember.getInetAddress()).thenReturn(InetAddress.getLocalHost());
-    when(mockMember.getId()).thenReturn("member-1");
-
-    for (int i = 0; i < 100; i++) {
-      manager.addMember(mockMember);
-    }
-
-    await()
-        .until(() -> serverRule.getCache().getAllRegions().size() > 1);
-    assertThat(manager.getAndResetLatestException()).isNull();
-  }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
new file mode 100644
index 0000000..355e3ec
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
+import static org.apache.geode.management.internal.SystemManagementService.FEDERATING_MANAGER_FACTORY_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.junit.categories.JMXTest;
+
+@Category(JMXTest.class)
+public class FederatingManagerIntegrationTest {
+
+  private InternalCache cache;
+  private FederatingManager federatingManager;
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  @Before
+  public void setUp() {
+    System.setProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+        FederatingManagerFactoryWithMockMessenger.class.getName());
+
+    cache = (InternalCache) new CacheFactory()
+        .set(LOCATORS, "")
+        .create();
+
+    SystemManagementService managementService =
+        (SystemManagementService) ManagementService.getExistingManagementService(cache);
+    managementService.createManager();
+    federatingManager = managementService.getFederatingManager();
+    federatingManager.startManager();
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+  }
+
+  @Test
+  public void testFederatingManagerConcurrency() throws UnknownHostException {
+    InternalDistributedMember member = member();
+
+    for (int i = 1; i <= 100; i++) {
+      federatingManager.addMember(member);
+    }
+
+    await().until(() -> !cache.getAllRegions().isEmpty());
+
+    assertThat(federatingManager.getAndResetLatestException()).isNull();
+  }
+
+  private InternalDistributedMember member() throws UnknownHostException {
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(member.getInetAddress()).thenReturn(getLocalHost());
+    when(member.getId()).thenReturn("member-1");
+    return member;
+  }
+
+  private static class FederatingManagerFactoryWithMockMessenger
+      implements FederatingManagerFactory {
+
+    public FederatingManagerFactoryWithMockMessenger() {
+      // must be public for instantiation by reflection
+    }
+
+    @Override
+    public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+        SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+        StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+        ExecutorService executorService) {
+      return new FederatingManager(repo, system, service, cache, statisticsFactory,
+          statisticsClock, proxyFactory, mock(MemberMessenger.class), executorService);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java b/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
index 66abcf8..7a748b1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
@@ -18,15 +18,15 @@
  * Indicates that the region has been destroyed. Further operations on the region object are not
  * allowed.
  *
- *
  * @since GemFire 2.0
  */
 public class RegionDestroyedException extends CacheRuntimeException {
   private static final long serialVersionUID = 319804842308010754L;
-  private String regionFullPath;
+
+  private final String regionFullPath;
 
   /**
-   * Constructs a <code>RegionDestroyedException</code> with a message.
+   * Constructs a {@code RegionDestroyedException} with a message.
    *
    * @param msg the String message
    */
@@ -36,7 +36,7 @@
   }
 
   /**
-   * Constructs a <code>RegionDestroyedException</code> with a message and a cause.
+   * Constructs a {@code RegionDestroyedException} with a message and a cause.
    *
    * @param s the String message
    * @param ex the Throwable cause
@@ -47,6 +47,6 @@
   }
 
   public String getRegionFullPath() {
-    return this.regionFullPath;
+    return regionFullPath;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index ca735c0..c5168a2 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -40,6 +40,7 @@
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.TimeoutException;
@@ -51,7 +52,6 @@
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingExecutors;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.management.ManagementException;
 
@@ -65,7 +65,10 @@
  * @since GemFire 7.0
  */
 public class FederatingManager extends Manager {
-  public static final Logger logger = LogService.getLogger();
+  private static final Logger logger = LogService.getLogger();
+
+  private final SystemManagementService service;
+  private final AtomicReference<Exception> latestException = new AtomicReference<>();
 
   /**
    * This Executor uses a pool of thread to execute the member addition /removal tasks, This will
@@ -73,31 +76,19 @@
    * unbounded in practical situation as number of members will be a finite set at any given point
    * of time
    */
-  private ExecutorService pooledMembershipExecutor;
+  private final ExecutorService executorService;
+  private final MBeanProxyFactory proxyFactory;
+  private final MemberMessenger messenger;
 
-  /**
-   * Proxy factory is used to create , remove proxies
-   */
-  private MBeanProxyFactory proxyFactory;
-
-  private MemberMessenger messenger;
-
-  private final SystemManagementService service;
-
-  private final AtomicReference<Exception> latestException = new AtomicReference<>(null);
-
-  FederatingManager(MBeanJMXAdapter jmxAdapter, ManagementResourceRepo repo,
-      InternalDistributedSystem system, SystemManagementService service, InternalCache cache,
-      StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
+  FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
+      SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+      StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+      ExecutorService executorService) {
     super(repo, system, cache, statisticsFactory, statisticsClock);
     this.service = service;
-    proxyFactory = new MBeanProxyFactory(jmxAdapter, service);
-    messenger = new MemberMessenger(jmxAdapter, system);
-  }
-
-  @VisibleForTesting
-  void setProxyFactory(MBeanProxyFactory newProxyFactory) {
-    proxyFactory = newProxyFactory;
+    this.proxyFactory = proxyFactory;
+    this.messenger = messenger;
+    this.executorService = executorService;
   }
 
   /**
@@ -111,9 +102,6 @@
         logger.debug("Starting the Federating Manager.... ");
       }
 
-      pooledMembershipExecutor = LoggingExecutors.newFixedThreadPool("FederatingManager", true,
-          Runtime.getRuntime().availableProcessors());
-
       running = true;
       startManagingActivity();
       messenger.broadcastManagerInfo();
@@ -136,46 +124,13 @@
     stopManagingActivity();
   }
 
-  /**
-   * This method will be invoked whenever a member stops being a managing node. The
-   * {@code ManagementException} has to be handled by the caller.
-   */
-  private void stopManagingActivity() {
-    try {
-      pooledMembershipExecutor.shutdownNow();
-
-      for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
-        removeMemberArtifacts(distributedMember, false);
-      }
-    } catch (Exception e) {
-      throw new ManagementException(e);
-    }
-  }
-
   @Override
   public boolean isRunning() {
     return running;
   }
 
-  /**
-   * This method will be invoked from MembershipListener which is registered when the member becomes
-   * a Management node.
-   *
-   * <p>
-   * This method will delegate task to another thread and exit, so that it wont block the membership
-   * listener
-   */
-  @VisibleForTesting
-  public void addMember(InternalDistributedMember member) {
-    GIITask giiTask = new GIITask(member);
-    executeTask(() -> {
-      try {
-        giiTask.call();
-      } catch (RuntimeException e) {
-        logger.warn("Error federating new member {}", member.getId(), e);
-        latestException.set(e);
-      }
-    });
+  public MemberMessenger getMessenger() {
+    return messenger;
   }
 
   /**
@@ -190,38 +145,6 @@
     executeTask(new RemoveMemberTask(member, crashed));
   }
 
-  private void executeTask(Runnable task) {
-    try {
-      pooledMembershipExecutor.execute(task);
-    } catch (RejectedExecutionException ignored) {
-      // Ignore, we are getting shutdown
-    }
-  }
-
-  private void removeMemberArtifacts(DistributedMember member, boolean crashed) {
-    Region<String, Object> proxyRegion = repo.getEntryFromMonitoringRegionMap(member);
-    Region<NotificationKey, Notification> notificationRegion =
-        repo.getEntryFromNotifRegionMap(member);
-
-    if (proxyRegion == null && notificationRegion == null) {
-      return;
-    }
-
-    repo.romoveEntryFromMonitoringRegionMap(member);
-    repo.removeEntryFromNotifRegionMap(member);
-
-    // If cache is closed all the regions would have been destroyed implicitly
-    if (!cache.isClosed()) {
-      proxyFactory.removeAllProxies(member, proxyRegion);
-      proxyRegion.localDestroyRegion();
-      notificationRegion.localDestroyRegion();
-    }
-
-    if (!system.getDistributedMember().equals(member)) {
-      service.memberDeparted((InternalDistributedMember) member, crashed);
-    }
-  }
-
   /**
    * This method will be invoked from MembershipListener which is registered when the member becomes
    * a Management node.
@@ -236,6 +159,66 @@
   }
 
   /**
+   * This will return the last updated time of the proxyMBean.
+   *
+   * @param objectName {@link ObjectName} of the MBean
+   *
+   * @return last updated time of the proxy
+   */
+  long getLastUpdateTime(ObjectName objectName) {
+    return proxyFactory.getLastUpdateTime(objectName);
+  }
+
+  /**
+   * Find a particular proxy instance for a {@link ObjectName}, {@link DistributedMember} and
+   * interface class If the proxy interface does not implement the given interface class a
+   * {@link ClassCastException} will be thrown
+   *
+   * @param objectName {@link ObjectName} of the MBean
+   * @param interfaceClass interface class implemented by proxy
+   *
+   * @return an instance of proxy exposing the given interface
+   */
+  <T> T findProxy(ObjectName objectName, Class<T> interfaceClass) {
+    return proxyFactory.findProxy(objectName, interfaceClass);
+  }
+
+  /**
+   * Find a set of proxies given a {@link DistributedMember}.
+   *
+   * @param member {@link DistributedMember}
+   *
+   * @return a set of {@link ObjectName}
+   */
+  Set<ObjectName> findAllProxies(DistributedMember member) {
+    return proxyFactory.findAllProxies(member);
+  }
+
+  /**
+   * This method will be invoked whenever a member stops being a managing node. The
+   * {@code ManagementException} has to be handled by the caller.
+   */
+  private void stopManagingActivity() {
+    try {
+      executorService.shutdownNow();
+
+      for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
+        removeMemberArtifacts(distributedMember, false);
+      }
+    } catch (Exception e) {
+      throw new ManagementException(e);
+    }
+  }
+
+  private void executeTask(Runnable task) {
+    try {
+      executorService.execute(task);
+    } catch (RejectedExecutionException ignored) {
+      // Ignore, we are getting shutdown
+    }
+  }
+
+  /**
    * This method will be invoked when a node transitions from managed node to managing node This
    * method will block for all GIIs to be completed But each GII is given a specific time frame.
    * After that the task will be marked as cancelled.
@@ -255,7 +238,7 @@
         logger.debug("Management Resource creation started  : ");
       }
       List<Future<InternalDistributedMember>> futureTaskList =
-          pooledMembershipExecutor.invokeAll(giiTaskList);
+          executorService.invokeAll(giiTaskList);
 
       for (Future<InternalDistributedMember> futureTask : futureTaskList) {
         try {
@@ -301,60 +284,70 @@
   }
 
   /**
-   * For internal Use only
+   * This method will be invoked from MembershipListener which is registered when the member becomes
+   * a Management node.
+   *
+   * <p>
+   * This method will delegate task to another thread and exit, so that it wont block the membership
+   * listener
    */
   @VisibleForTesting
+  void addMember(InternalDistributedMember member) {
+    GIITask giiTask = new GIITask(member);
+    executeTask(() -> {
+      try {
+        giiTask.call();
+      } catch (RuntimeException e) {
+        logger.warn("Error federating new member {}", member.getId(), e);
+        latestException.set(e);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  void removeMemberArtifacts(DistributedMember member, boolean crashed) {
+    Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
+    Region<NotificationKey, Notification> notificationRegion =
+        repo.getEntryFromNotifRegionMap(member);
+
+    if (monitoringRegion == null && notificationRegion == null) {
+      return;
+    }
+
+    repo.romoveEntryFromMonitoringRegionMap(member);
+    repo.removeEntryFromNotifRegionMap(member);
+
+    // If cache is closed all the regions would have been destroyed implicitly
+    if (!cache.isClosed()) {
+      try {
+        proxyFactory.removeAllProxies(member, monitoringRegion);
+      } catch (RegionDestroyedException ignore) {
+        // ignored
+      }
+      try {
+        monitoringRegion.localDestroyRegion();
+      } catch (RegionDestroyedException ignore) {
+        // ignored
+      }
+      try {
+        notificationRegion.localDestroyRegion();
+      } catch (RegionDestroyedException ignore) {
+        // ignored
+      }
+    }
+
+    if (!system.getDistributedMember().equals(member)) {
+      service.memberDeparted((InternalDistributedMember) member, crashed);
+    }
+  }
+
+  @VisibleForTesting
   public MBeanProxyFactory getProxyFactory() {
     return proxyFactory;
   }
 
-  /**
-   * This will return the last updated time of the proxyMBean.
-   *
-   * @param objectName {@link ObjectName} of the MBean
-   *
-   * @return last updated time of the proxy
-   */
-  long getLastUpdateTime(ObjectName objectName) {
-    return proxyFactory.getLastUpdateTime(objectName);
-  }
-
-  /**
-   * Find a particular proxy instance for a {@link ObjectName}, {@link DistributedMember} and
-   * interface class If the proxy interface does not implement the given interface class a
-   * {@link ClassCastException} will be thrown
-   *
-   * @param objectName {@link ObjectName} of the MBean
-   * @param interfaceClass interface class implemented by proxy
-   *
-   * @return an instance of proxy exposing the given interface
-   */
-  <T> T findProxy(ObjectName objectName, Class<T> interfaceClass) {
-    return proxyFactory.findProxy(objectName, interfaceClass);
-  }
-
-  /**
-   * Find a set of proxies given a {@link DistributedMember}.
-   *
-   * @param member {@link DistributedMember}
-   *
-   * @return a set of {@link ObjectName}
-   */
-  Set<ObjectName> findAllProxies(DistributedMember member) {
-    return proxyFactory.findAllProxies(member);
-  }
-
-  public MemberMessenger getMessenger() {
-    return messenger;
-  }
-
   @VisibleForTesting
-  public void setMessenger(MemberMessenger messenger) {
-    this.messenger = messenger;
-  }
-
-  @VisibleForTesting
-  public synchronized Exception getAndResetLatestException() {
+  synchronized Exception getAndResetLatestException() {
     return latestException.getAndSet(null);
   }
 
@@ -500,7 +493,7 @@
 
       // Before completing task intimate all listening ProxyListener which might send
       // notifications.
-      service.memberJoined((InternalDistributedMember) member);
+      service.memberJoined(member);
 
       // Send manager info to the added member
       messenger.sendManagerInfo(member);
@@ -538,7 +531,7 @@
 
     private final InternalDistributedMember member;
 
-    GIITask(InternalDistributedMember member) {
+    private GIITask(InternalDistributedMember member) {
       this.member = member;
     }
 
@@ -554,7 +547,7 @@
     private final DistributedMember member;
     private final boolean crashed;
 
-    RemoveMemberTask(DistributedMember member, boolean crashed) {
+    private RemoveMemberTask(DistributedMember member, boolean crashed) {
       this.member = member;
       this.crashed = crashed;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
new file mode 100644
index 0000000..ed2e62b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.management.internal;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+
+@FunctionalInterface
+interface FederatingManagerFactory {
+
+  FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+      SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+      StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+      ExecutorService executorService);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
index 5962d29..7a48465 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
@@ -15,6 +15,7 @@
 package org.apache.geode.management.internal;
 
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
@@ -34,12 +35,12 @@
   /**
    * depicts whether this node is a Managing node or not
    */
-  protected volatile boolean running = false;
+  protected volatile boolean running;
 
   /**
    * depicts whether this node is a Managing node or not
    */
-  protected volatile boolean stopCacheOps = false;
+  protected volatile boolean stopCacheOps;
 
   /**
    * This is a single window to manipulate region resources for management
@@ -70,9 +71,7 @@
 
   public abstract void stopManager();
 
-  /**
-   * For internal use only
-   */
+  @VisibleForTesting
   public ManagementResourceRepo getManagementResourceRepo() {
     return repo;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 79d99d4..9ba9be1 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -14,11 +14,17 @@
  */
 package org.apache.geode.management.internal;
 
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toSet;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutorService;
 
 import javax.management.Notification;
 import javax.management.ObjectName;
@@ -27,6 +33,8 @@
 
 import org.apache.geode.CancelException;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -36,6 +44,7 @@
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingExecutors;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.management.AlreadyRunningException;
 import org.apache.geode.management.AsyncEventQueueMXBean;
@@ -52,7 +61,6 @@
 import org.apache.geode.management.ManagerMXBean;
 import org.apache.geode.management.MemberMXBean;
 import org.apache.geode.management.RegionMXBean;
-import org.apache.geode.management.internal.beans.ManagementAdapter;
 import org.apache.geode.management.membership.MembershipEvent;
 import org.apache.geode.management.membership.MembershipListener;
 
@@ -65,53 +73,33 @@
 public class SystemManagementService extends BaseManagementService {
   private static final Logger logger = LogService.getLogger();
 
+  @Immutable
+  @VisibleForTesting
+  static final String FEDERATING_MANAGER_FACTORY_PROPERTY = "FEDERATING_MANAGER_FACTORY";
+
   /**
    * The concrete implementation of DistributedSystem that provides internal-only functionality.
    */
   private final InternalDistributedSystem system;
 
   /**
-   * core component for distribution
-   */
-  private LocalManager localManager;
-
-  /**
    * This is a notification hub to listen all the notifications emitted from all the MBeans in a
    * peer cache./cache server
    */
   private final NotificationHub notificationHub;
 
   /**
-   * whether the service is closed or not if cache is closed automatically this service will be
-   * closed
-   */
-  private volatile boolean closed = false;
-
-  /**
-   * has the management service has started yet
-   */
-  private volatile boolean isStarted = false;
-
-  /**
    * Adapter to interact with platform MBean server
    */
   private final MBeanJMXAdapter jmxAdapter;
 
   private final InternalCacheForClientAccess cache;
 
-  private FederatingManager federatingManager;
-
   private final ManagementAgent agent;
 
   private final ManagementResourceRepo repo;
 
   /**
-   * This membership listener will listen on membership events after the node has transformed into a
-   * Managing node.
-   */
-  private ManagementMembershipListener listener;
-
-  /**
    * Proxy aggregator to create aggregate MBeans e.g. DistributedSystem and DistributedRegion
    * GemFire comes with a default aggregator.
    */
@@ -122,19 +110,38 @@
 
   private final StatisticsFactory statisticsFactory;
   private final StatisticsClock statisticsClock;
+  private final FederatingManagerFactory federatingManagerFactory;
 
-  public static BaseManagementService newSystemManagementService(
+  /**
+   * whether the service is closed or not if cache is closed automatically this service will be
+   * closed
+   */
+  private volatile boolean closed;
+
+  /**
+   * has the management service has started yet
+   */
+  private volatile boolean isStarted;
+
+  private LocalManager localManager;
+
+  private FederatingManager federatingManager;
+
+  /**
+   * This membership listener will listen on membership events after the node has transformed into a
+   * Managing node.
+   */
+  private ManagementMembershipListener listener;
+
+  static BaseManagementService newSystemManagementService(
       InternalCacheForClientAccess cache) {
     return new SystemManagementService(cache).init();
   }
 
-  protected SystemManagementService(InternalCacheForClientAccess cache) {
+  private SystemManagementService(InternalCacheForClientAccess cache) {
     this.cache = cache;
-    this.system = cache.getInternalDistributedSystem();
-    // This is a safe check to ensure Management service does not start for a
-    // system which is disconnected.
-    // Most likely scenario when this will happen is when a cache is closed and we are at this
-    // point.
+    system = cache.getInternalDistributedSystem();
+
     if (!system.isConnected()) {
       throw new DistributedSystemDisconnectedException(
           "This connection to a distributed system has been disconnected.");
@@ -142,116 +149,49 @@
 
     statisticsFactory = system.getStatisticsManager();
     statisticsClock = cache.getStatisticsClock();
+    jmxAdapter = new MBeanJMXAdapter(system.getDistributedMember());
+    repo = new ManagementResourceRepo();
+    notificationHub = new NotificationHub(repo);
 
-    this.jmxAdapter = new MBeanJMXAdapter(this.system.getDistributedMember());
-    this.repo = new ManagementResourceRepo();
-
-    this.notificationHub = new NotificationHub(repo);
     if (system.getConfig().getJmxManager()) {
-      this.agent = new ManagementAgent(system.getConfig(), cache);
+      agent = new ManagementAgent(system.getConfig(), cache);
     } else {
-      this.agent = null;
+      agent = null;
     }
-    ManagementFunction function = new ManagementFunction(notificationHub);
-    FunctionService.registerFunction(function);
-    this.proxyListeners = new CopyOnWriteArrayList<>();
-  }
 
-  /**
-   * This method will initialize all the internal components for Management and Monitoring
-   *
-   * It will a)start an JMX connectorServer b) create a notification hub c)register the
-   * ManagementFunction
-   */
-  private SystemManagementService init() {
-    try {
-      this.localManager =
-          new LocalManager(repo, system, this, cache, statisticsFactory, statisticsClock);
-      this.localManager.startManager();
-      this.listener = new ManagementMembershipListener(this);
-      system.getDistributionManager().addMembershipListener(listener);
-      isStarted = true;
-      return this;
-    } catch (CancelException e) {
-      // Rethrow all CancelExceptions (fix for defect 46339)
-      throw e;
-    } catch (Exception e) {
-      // Wrap all other exceptions as ManagementExceptions
-      logger.error(e.getMessage(), e);
-      throw new ManagementException(e);
-    }
-  }
+    FunctionService.registerFunction(new ManagementFunction(notificationHub));
 
-  /**
-   * For internal Use only
-   */
-  public LocalManager getLocalManager() {
-    return localManager;
-  }
-
-  public NotificationHub getNotificationHub() {
-    return notificationHub;
-  }
-
-  public FederatingManager getFederatingManager() {
-    return federatingManager;
-  }
-
-  public MBeanJMXAdapter getJMXAdapter() {
-    return jmxAdapter;
-  }
-
-  public ManagementAgent getManagementAgent() {
-    return agent;
-  }
-
-  public boolean isStartedAndOpen() {
-    return isStarted && !closed && system.isConnected();
-  }
-
-  private void verifyManagementService() {
-    if (!isStarted) {
-      throw new ManagementException(
-          "Management Service Not Started Yet");
-    }
-    if (!system.isConnected()) {
-      throw new ManagementException(
-          "Not Connected To Distributed System");
-    }
-    if (closed) {
-      throw new ManagementException(
-          "Management Service Is Closed");
-    }
+    proxyListeners = new CopyOnWriteArrayList<>();
+    federatingManagerFactory = createFederatingManagerFactory();
   }
 
   @Override
   public void close() {
     synchronized (instances) {
       if (closed) {
-        // its a no op, hence not logging any exception
         return;
       }
+
       if (logger.isDebugEnabled()) {
         logger.debug("Closing Management Service");
       }
       if (listener != null && system.isConnected()) {
         system.getDistributionManager().removeMembershipListener(listener);
       }
-      // Stop the Federating Manager first . It will ensure MBeans are not getting federated.
-      // while un-registering
+      // Stop the Federating Manager first to avoid federating while un-registering
       if (federatingManager != null && federatingManager.isRunning()) {
         federatingManager.stopManager();
       }
-      this.notificationHub.cleanUpListeners();
+      notificationHub.cleanUpListeners();
       jmxAdapter.cleanJMXResource();
       if (localManager.isRunning()) {
         localManager.stopManager();
       }
-      if (this.agent != null && this.agent.isRunning()) {
-        this.agent.stopAgent();
+      if (agent != null && agent.isRunning()) {
+        agent.stopAgent();
       }
 
-      getInternalCache().getJmxManagerAdvisor().broadcastChange();
+      cache.getJmxManagerAdvisor().broadcastChange();
       instances.remove(cache);
       localManager = null;
       closed = true;
@@ -262,31 +202,28 @@
   public <T> void federate(ObjectName objectName, Class<T> interfaceClass,
       boolean notificationEmitter) {
     verifyManagementService();
-    if (!objectName.getDomain().equalsIgnoreCase(ManagementConstants.OBJECTNAME__DEFAULTDOMAIN)) {
-      throw new ManagementException(
-          "Not A GemFire Domain MBean, can not Federate");
-    }
 
+    if (!objectName.getDomain().equalsIgnoreCase(ManagementConstants.OBJECTNAME__DEFAULTDOMAIN)) {
+      throw new ManagementException("Not A GemFire Domain MBean, can not Federate");
+    }
     if (!jmxAdapter.isRegistered(objectName)) {
-      throw new ManagementException(
-          "MBean Not Registered In GemFire Domain");
+      throw new ManagementException("MBean Not Registered In GemFire Domain");
     }
     if (notificationEmitter && !jmxAdapter.hasNotificationSupport(objectName)) {
-      throw new ManagementException(
-          "MBean Does Not Have Notification Support");
+      throw new ManagementException("MBean Does Not Have Notification Support");
     }
 
     // All validation Passed. Now create the federation Component
     Object object = jmxAdapter.getMBeanObject(objectName);
-    FederationComponent fedComp =
+    FederationComponent federationComponent =
         new FederationComponent(object, objectName, interfaceClass, notificationEmitter);
-    if (ManagementAdapter.refreshOnInit.contains(interfaceClass)) {
-      fedComp.refreshObjectState(true);// Fixes 46387
+    if (asList(RegionMXBean.class, MemberMXBean.class).contains(interfaceClass)) {
+      federationComponent.refreshObjectState(true);
     }
-    localManager.markForFederation(objectName, fedComp);
+    localManager.markForFederation(objectName, federationComponent);
 
     if (isManager()) {
-      afterCreateProxy(objectName, interfaceClass, object, fedComp);
+      afterCreateProxy(objectName, interfaceClass, object, federationComponent);
     }
   }
 
@@ -302,8 +239,8 @@
     }
     if (federatingManager == null) {
       return 0;
-
-    } else if (!federatingManager.isRunning()) {
+    }
+    if (!federatingManager.isRunning()) {
       return 0;
     }
     if (jmxAdapter.isLocalMBean(objectName)) {
@@ -327,20 +264,6 @@
     return jmxAdapter.getLocalRegionMXBean(regionPath);
   }
 
-  public <T> T getMBeanProxy(ObjectName objectName, Class<T> interfaceClass) {
-    if (!isStartedAndOpen()) {
-      return null;
-    }
-    if (federatingManager == null) {
-      return null;
-
-    } else if (!federatingManager.isRunning()) {
-      return null;
-    }
-
-    return federatingManager.findProxy(objectName, interfaceClass);
-  }
-
   @Override
   public MemberMXBean getMemberMXBean() {
     return jmxAdapter.getMemberMXBean();
@@ -353,22 +276,21 @@
     }
     if (system.getDistributedMember().equals(member)) {
       return jmxAdapter.getLocalGemFireMBean().keySet();
-    } else {
-      if (federatingManager == null) {
-        return Collections.emptySet();
-
-      } else if (!federatingManager.isRunning()) {
-        return Collections.emptySet();
-      }
-      return federatingManager.findAllProxies(member);
     }
+    if (federatingManager == null) {
+      return Collections.emptySet();
+    }
+    if (!federatingManager.isRunning()) {
+      return Collections.emptySet();
+    }
+    return federatingManager.findAllProxies(member);
   }
 
   @Override
   public Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member) {
-    Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
-    return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
-        .collect(Collectors.toSet());
+    return queryMBeanNames(member).stream()
+        .filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
+        .collect(toSet());
   }
 
   @Override
@@ -377,24 +299,20 @@
     return jmxAdapter.registerMBean(object, objectName, false);
   }
 
-  public ObjectName registerInternalMBean(Object object, ObjectName objectName) {
-    verifyManagementService();
-    return jmxAdapter.registerMBean(object, objectName, true);
-  }
-
   @Override
   public void unregisterMBean(ObjectName objectName) {
     if (!isStartedAndOpen()) {
       return;
     }
+
     verifyManagementService();
 
     if (isManager()) {
-      FederationComponent removedObj = localManager.getFedComponents().get(objectName);
-      if (removedObj != null) { // only for MBeans local to Manager , not
-                                // proxies
-        afterRemoveProxy(objectName, removedObj.getInterfaceClass(), removedObj.getMBeanObject(),
-            removedObj);
+      FederationComponent removed = localManager.getFedComponents().get(objectName);
+      if (removed != null) {
+        // only for MBeans local to Manager, not proxies
+        afterRemoveProxy(objectName, removed.getInterfaceClass(), removed.getMBeanObject(),
+            removed);
       }
     }
 
@@ -407,19 +325,16 @@
     return isManagerCreated() && federatingManager.isRunning();
   }
 
-  public boolean isManagerCreated() {
-    return isStartedAndOpen() && federatingManager != null;
-  }
-
   @Override
   public void startManager() {
-    if (!getInternalCache().getInternalDistributedSystem().getConfig().getJmxManager()) {
-      // fix for 45900
+    if (!cache.getInternalDistributedSystem().getConfig().getJmxManager()) {
       throw new ManagementException(
           "Could not start the manager because the gemfire property \"jmx-manager\" is false.");
     }
+
     synchronized (instances) {
       verifyManagementService();
+
       if (federatingManager != null && federatingManager.isRunning()) {
         throw new AlreadyRunningException(
             "Manager is already running");
@@ -432,15 +347,16 @@
       } else if (!federatingManager.isRunning()) {
         needsToBeStarted = true;
       }
+
       if (needsToBeStarted) {
         boolean started = false;
         try {
           system.handleResourceEvent(ResourceEvent.MANAGER_START, null);
           federatingManager.startManager();
-          if (this.agent != null) {
-            this.agent.startAgent();
+          if (agent != null) {
+            agent.startAgent();
           }
-          getInternalCache().getJmxManagerAdvisor().broadcastChange();
+          cache.getJmxManagerAdvisor().broadcastChange();
           started = true;
         } catch (RuntimeException | Error e) {
           logger.error("Jmx manager could not be started because {}", e.getMessage(), e);
@@ -457,27 +373,6 @@
     }
   }
 
-  private InternalCache getInternalCache() {
-    return this.cache;
-  }
-
-  /**
-   * Creates a Manager instance in stopped state.
-   */
-  public boolean createManager() {
-    synchronized (instances) {
-      if (federatingManager != null) {
-        return false;
-      }
-      system.handleResourceEvent(ResourceEvent.MANAGER_CREATE, null);
-      // An initialised copy of federating manager
-      federatingManager = new FederatingManager(jmxAdapter, repo, system, this, cache,
-          statisticsFactory, statisticsClock);
-      getInternalCache().getJmxManagerAdvisor().broadcastChange();
-      return true;
-    }
-  }
-
   /**
    * It will stop the federating Manager and restart the Local cache operation
    */
@@ -488,9 +383,9 @@
       if (federatingManager != null) {
         federatingManager.stopManager();
         system.handleResourceEvent(ResourceEvent.MANAGER_STOP, null);
-        getInternalCache().getJmxManagerAdvisor().broadcastChange();
-        if (this.agent != null && this.agent.isRunning()) {
-          this.agent.stopAgent();
+        cache.getJmxManagerAdvisor().broadcastChange();
+        if (agent != null && agent.isRunning()) {
+          agent.stopAgent();
         }
       }
     }
@@ -516,18 +411,6 @@
     return jmxAdapter.getDistributedSystemMXBean();
   }
 
-  public void addProxyListener(ProxyListener listener) {
-    this.proxyListeners.add(listener);
-  }
-
-  public void removeProxyListener(ProxyListener listener) {
-    this.proxyListeners.remove(listener);
-  }
-
-  public List<ProxyListener> getProxyListeners() {
-    return this.proxyListeners;
-  }
-
   @Override
   public ManagerMXBean getManagerMXBean() {
     return jmxAdapter.getManagerMXBean();
@@ -618,36 +501,102 @@
     return jmxAdapter.getLocatorMXBean();
   }
 
-  public boolean afterCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+  @Override
+  public <T> T getMBeanInstance(ObjectName objectName, Class<T> interfaceClass) {
+    if (jmxAdapter.isLocalMBean(objectName)) {
+      return jmxAdapter.findMBeanByName(objectName, interfaceClass);
+    }
+    return getMBeanProxy(objectName, interfaceClass);
+  }
+
+  @Override
+  public void addMembershipListener(MembershipListener listener) {
+    universalListenerContainer.addMembershipListener(listener);
+  }
+
+  @Override
+  public void removeMembershipListener(MembershipListener listener) {
+    universalListenerContainer.removeMembershipListener(listener);
+  }
+
+  public LocalManager getLocalManager() {
+    return localManager;
+  }
+
+  public FederatingManager getFederatingManager() {
+    return federatingManager;
+  }
+
+  public MBeanJMXAdapter getJMXAdapter() {
+    return jmxAdapter;
+  }
+
+  public ManagementAgent getManagementAgent() {
+    return agent;
+  }
+
+  public <T> T getMBeanProxy(ObjectName objectName, Class<T> interfaceClass) {
+    if (!isStartedAndOpen()) {
+      return null;
+    }
+    if (federatingManager == null) {
+      return null;
+    }
+    if (!federatingManager.isRunning()) {
+      return null;
+    }
+    return federatingManager.findProxy(objectName, interfaceClass);
+  }
+
+  public ObjectName registerInternalMBean(Object object, ObjectName objectName) {
+    verifyManagementService();
+    return jmxAdapter.registerMBean(object, objectName, true);
+  }
+
+  public boolean isManagerCreated() {
+    return isStartedAndOpen() && federatingManager != null;
+  }
+
+  /**
+   * Creates a Manager instance in stopped state.
+   */
+  public boolean createManager() {
+    synchronized (instances) {
+      if (federatingManager != null) {
+        return false;
+      }
+      system.handleResourceEvent(ResourceEvent.MANAGER_CREATE, null);
+      // An initialised copy of federating manager
+      federatingManager = federatingManagerFactory.create(repo, system, this, cache,
+          statisticsFactory, statisticsClock, new MBeanProxyFactory(jmxAdapter, this),
+          new MemberMessenger(jmxAdapter, system),
+          LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+              Runtime.getRuntime().availableProcessors()));
+      cache.getJmxManagerAdvisor().broadcastChange();
+      return true;
+    }
+  }
+
+  public void addProxyListener(ProxyListener listener) {
+    proxyListeners.add(listener);
+  }
+
+  public void removeProxyListener(ProxyListener listener) {
+    proxyListeners.remove(listener);
+  }
+
+  public void afterCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
       FederationComponent newVal) {
     for (ProxyListener listener : proxyListeners) {
       listener.afterCreateProxy(objectName, interfaceClass, proxyObject, newVal);
     }
-    return true;
   }
 
-  public boolean afterPseudoCreateProxy(ObjectName objectName, Class interfaceClass,
-      Object proxyObject, FederationComponent newVal) {
-    for (ProxyListener listener : proxyListeners) {
-      listener.afterPseudoCreateProxy(objectName, interfaceClass, proxyObject, newVal);
-    }
-    return true;
-  }
-
-  public boolean afterRemoveProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+  public void afterRemoveProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
       FederationComponent oldVal) {
     for (ProxyListener listener : proxyListeners) {
       listener.afterRemoveProxy(objectName, interfaceClass, proxyObject, oldVal);
     }
-    return true;
-  }
-
-  public boolean afterUpdateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
-      FederationComponent newVal, FederationComponent oldVal) {
-    for (ProxyListener listener : proxyListeners) {
-      listener.afterUpdateProxy(objectName, interfaceClass, proxyObject, newVal, oldVal);
-    }
-    return true;
   }
 
   public void handleNotification(Notification notification) {
@@ -656,52 +605,133 @@
     }
   }
 
-  @Override
-  public <T> T getMBeanInstance(ObjectName objectName, Class<T> interfaceClass) {
-    if (jmxAdapter.isLocalMBean(objectName)) {
-      return jmxAdapter.findMBeanByName(objectName, interfaceClass);
-    } else {
-      return this.getMBeanProxy(objectName, interfaceClass);
-    }
-  }
-
-  public void logFine(String s) {
-    if (logger.isDebugEnabled()) {
-      logger.debug(s);
-    }
-  }
-
-  public void memberJoined(InternalDistributedMember id) {
+  void memberJoined(InternalDistributedMember id) {
     for (ProxyListener listener : proxyListeners) {
       listener.memberJoined(system.getDistributionManager(), id);
     }
   }
 
-  public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+  void memberDeparted(InternalDistributedMember id, boolean crashed) {
     for (ProxyListener listener : proxyListeners) {
       listener.memberDeparted(system.getDistributionManager(), id, crashed);
     }
   }
 
-  public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
+  void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
       String reason) {
     for (ProxyListener listener : proxyListeners) {
       listener.memberSuspect(system.getDistributionManager(), id, whoSuspected, reason);
     }
   }
 
-  public void quorumLost(Set<InternalDistributedMember> failures,
-      List<InternalDistributedMember> remaining) {
+  void afterPseudoCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+      FederationComponent newVal) {
     for (ProxyListener listener : proxyListeners) {
-      listener.quorumLost(system.getDistributionManager(), failures, remaining);
+      listener.afterPseudoCreateProxy(objectName, interfaceClass, proxyObject, newVal);
     }
   }
 
-  public static class UniversalListenerContainer {
+  boolean isStartedAndOpen() {
+    return isStarted && !closed && system.isConnected();
+  }
 
-    private List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
+  void afterUpdateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+      FederationComponent newVal, FederationComponent oldVal) {
+    for (ProxyListener listener : proxyListeners) {
+      listener.afterUpdateProxy(objectName, interfaceClass, proxyObject, newVal, oldVal);
+    }
+  }
 
-    public void memberJoined(InternalDistributedMember id) {
+  UniversalListenerContainer getUniversalListenerContainer() {
+    return universalListenerContainer;
+  }
+
+  private void verifyManagementService() {
+    if (!isStarted) {
+      throw new ManagementException(
+          "Management Service Not Started Yet");
+    }
+    if (!system.isConnected()) {
+      throw new ManagementException(
+          "Not Connected To Distributed System");
+    }
+    if (closed) {
+      throw new ManagementException(
+          "Management Service Is Closed");
+    }
+  }
+
+  /**
+   * This method will initialize all the internal components for Management and Monitoring
+   *
+   * It will: <br>
+   * a) start an JMX connectorServer <br>
+   * b) create a notification hub <br>
+   * c) register the ManagementFunction
+   */
+  private SystemManagementService init() {
+    try {
+      localManager =
+          new LocalManager(repo, system, this, cache, statisticsFactory, statisticsClock);
+      listener = new ManagementMembershipListener(this);
+
+      localManager.startManager();
+      system.getDistributionManager().addMembershipListener(listener);
+      isStarted = true;
+      return this;
+    } catch (CancelException e) {
+      // Rethrow all CancelExceptions
+      throw e;
+    } catch (Exception e) {
+      // Wrap all other exceptions as ManagementExceptions
+      logger.error(e.getMessage(), e);
+      throw new ManagementException(e);
+    }
+  }
+
+  private static FederatingManagerFactory createFederatingManagerFactory() {
+    try {
+      String federatingManagerFactoryName =
+          System.getProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+              FederatingManagerFactoryImpl.class.getName());
+      Class<? extends FederatingManagerFactory> federatingManagerFactoryClass =
+          Class.forName(federatingManagerFactoryName)
+              .asSubclass(FederatingManagerFactory.class);
+      Constructor<? extends FederatingManagerFactory> constructor =
+          federatingManagerFactoryClass.getConstructor();
+      return constructor.newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
+        | NoSuchMethodException | InvocationTargetException e) {
+      return new FederatingManagerFactoryImpl();
+    }
+  }
+
+  @VisibleForTesting
+  public NotificationHub getNotificationHub() {
+    return notificationHub;
+  }
+
+  private static class FederatingManagerFactoryImpl implements FederatingManagerFactory {
+
+    public FederatingManagerFactoryImpl() {
+      // must be public for instantiation by reflection
+    }
+
+    @Override
+    public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+        SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+        StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+        ExecutorService executorService) {
+      return new FederatingManager(repo, system, service, cache, statisticsFactory,
+          statisticsClock, proxyFactory, messenger, executorService);
+    }
+  }
+
+  static class UniversalListenerContainer {
+
+    private final Collection<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
+
+    void memberJoined(InternalDistributedMember id) {
       MembershipEvent event = createEvent(id);
       for (MembershipListener listener : membershipListeners) {
         try {
@@ -713,18 +743,9 @@
       }
     }
 
-    public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+    void memberDeparted(InternalDistributedMember id, boolean crashed) {
       MembershipEvent event = createEvent(id);
-      if (!crashed) {
-        for (MembershipListener listener : membershipListeners) {
-          try {
-            listener.memberLeft(event);
-          } catch (Exception e) {
-            logger.error("Could not invoke listener event memberLeft for listener[{}] due to ",
-                listener.getClass(), e.getMessage(), e);
-          }
-        }
-      } else {
+      if (crashed) {
         for (MembershipListener listener : membershipListeners) {
           try {
             listener.memberCrashed(event);
@@ -733,32 +754,23 @@
                 listener.getClass(), e.getMessage(), e);
           }
         }
+      } else {
+        for (MembershipListener listener : membershipListeners) {
+          try {
+            listener.memberLeft(event);
+          } catch (Exception e) {
+            logger.error("Could not invoke listener event memberLeft for listener[{}] due to ",
+                listener.getClass(), e.getMessage(), e);
+          }
+        }
       }
     }
 
-    private MembershipEvent createEvent(InternalDistributedMember id) {
-      final String memberId = id.getId();
-      final DistributedMember member = id;
-
-      return new MembershipEvent() {
-
-        @Override
-        public String getMemberId() {
-          return memberId;
-        }
-
-        @Override
-        public DistributedMember getDistributedMember() {
-          return member;
-        }
-      };
-    }
-
     /**
      * Registers a listener that receives call backs when a member joins or leaves the distributed
      * system.
      */
-    public void addMembershipListener(MembershipListener listener) {
+    private void addMembershipListener(MembershipListener listener) {
       membershipListeners.add(listener);
     }
 
@@ -767,23 +779,23 @@
      *
      * @see #addMembershipListener
      */
-    public void removeMembershipListener(MembershipListener listener) {
+    private void removeMembershipListener(MembershipListener listener) {
       membershipListeners.remove(listener);
     }
-  }
 
-  public UniversalListenerContainer getUniversalListenerContainer() {
-    return universalListenerContainer;
-  }
+    private MembershipEvent createEvent(DistributedMember id) {
+      return new MembershipEvent() {
 
-  @Override
-  public void addMembershipListener(MembershipListener listener) {
-    universalListenerContainer.addMembershipListener(listener);
+        @Override
+        public String getMemberId() {
+          return id.getId();
+        }
 
-  }
-
-  @Override
-  public void removeMembershipListener(MembershipListener listener) {
-    universalListenerContainer.removeMembershipListener(listener);
+        @Override
+        public DistributedMember getDistributedMember() {
+          return id;
+        }
+      };
+    }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 1327d6f..bd8702b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -14,9 +14,68 @@
  */
 package org.apache.geode.management.internal.beans;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.geode.management.JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.ASYNC_EVENT_QUEUE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVICE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_CRASHED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_JOINED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_LEFT;
+import static org.apache.geode.management.JMXNotificationType.DISK_STORE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.DISK_STORE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_DESTROYED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_PAUSED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_REMOVED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_RESUMED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.LOCATOR_STARTED;
+import static org.apache.geode.management.JMXNotificationType.LOCK_SERVICE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.LOCK_SERVICE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.REGION_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.REGION_CREATED;
+import static org.apache.geode.management.JMXNotificationType.SYSTEM_ALERT;
+import static org.apache.geode.management.JMXNotificationUserData.ALERT_LEVEL;
+import static org.apache.geode.management.JMXNotificationUserData.MEMBER;
+import static org.apache.geode.management.JMXNotificationUserData.THREAD;
+import static org.apache.geode.management.internal.AlertDetails.getAlertLevelAsString;
+import static org.apache.geode.management.internal.ManagementConstants.AGGREGATE_MBEAN_PATTERN;
+import static org.apache.geode.management.internal.ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.ASYNC_EVENT_QUEUE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVICE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_CRASHED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_JOINED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_LEFT_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.DISK_STORE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.DISK_STORE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_PAUSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_REMOVED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_RESUMED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCATOR_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCK_SERVICE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCK_SERVICE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.REGION_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.REGION_CREATED_PREFIX;
+
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,18 +89,16 @@
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Immutable;
-import org.apache.geode.annotations.internal.MakeNotStatic;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -51,7 +108,6 @@
 import org.apache.geode.internal.ClassLoadUtil;
 import org.apache.geode.internal.cache.CacheService;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.AsyncEventQueueMXBean;
@@ -59,8 +115,6 @@
 import org.apache.geode.management.DiskStoreMXBean;
 import org.apache.geode.management.GatewayReceiverMXBean;
 import org.apache.geode.management.GatewaySenderMXBean;
-import org.apache.geode.management.JMXNotificationType;
-import org.apache.geode.management.JMXNotificationUserData;
 import org.apache.geode.management.LocatorMXBean;
 import org.apache.geode.management.LockServiceMXBean;
 import org.apache.geode.management.ManagementException;
@@ -71,7 +125,6 @@
 import org.apache.geode.management.internal.AlertDetails;
 import org.apache.geode.management.internal.FederationComponent;
 import org.apache.geode.management.internal.MBeanJMXAdapter;
-import org.apache.geode.management.internal.ManagementConstants;
 import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.management.membership.ClientMembership;
 import org.apache.geode.management.membership.ClientMembershipEvent;
@@ -83,6 +136,7 @@
  * Acts as an intermediate between MBean layer and Federation Layer. Handles all Call backs from
  * GemFire to instantiate or remove MBeans from GemFire Domain.
  *
+ * <p>
  * Even though this class have a lot of utility functions it interacts with the state of the system
  * and contains some state itself.
  */
@@ -90,95 +144,57 @@
 
   private static final Logger logger = LogService.getLogger();
 
-  /** Internal ManagementService Instance **/
-  private SystemManagementService service;
-
-  /** GemFire Cache impl **/
-  private InternalCache internalCache;
-
-  /** Member Name **/
-  private String memberSource;
-
-  /**
-   * emitter is a helper class for sending notifications on behalf of the MemberMBean
-   **/
-  private NotificationBroadcasterSupport memberLevelNotifEmitter;
-
-  /** The <code>MBeanServer</code> for this application */
-  @MakeNotStatic
-  public static final MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
-
-  /** MemberMBean instance **/
-  private MemberMBean memberBean;
-
-  private volatile boolean serviceInitialised = false;
-
-  @MakeNotStatic
-  private MBeanAggregator aggregator;
-
   @Immutable
-  public static final List<Class> refreshOnInit;
+  private static final List<String> INTERNAL_LOCK_SERVICES =
+      unmodifiableList(asList(DLockService.DTLS, DLockService.LTLS,
+          PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
+          PeerTypeRegistration.LOCK_SERVICE_NAME));
 
-  @Immutable
-  public static final List<String> internalLocks;
-
-  static {
-    refreshOnInit =
-        Collections.unmodifiableList(Arrays.asList(RegionMXBean.class, MemberMXBean.class));
-
-    internalLocks = Collections.unmodifiableList(Arrays.asList(
-        DLockService.DTLS, // From reserved lock service name
-        DLockService.LTLS, // From reserved lock service name
-        PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
-        PeerTypeRegistration.LOCK_SERVICE_NAME));
-  }
-
-  protected MemberMBeanBridge memberMBeanBridge;
-
+  private final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
   private final Object regionOpLock = new Object();
 
+  private SystemManagementService service;
+  private InternalCache internalCache;
+  private String memberSource;
+  private NotificationBroadcasterSupport memberLevelNotificationEmitter;
+  private MemberMBean memberBean;
+  private MBeanAggregator aggregator;
+  private MemberMBeanBridge memberMBeanBridge;
+
+  private volatile boolean serviceInitialised;
+
   /**
-   * Adapter life cycle is tied with the Cache . So its better to make all cache level artifacts as
+   * Adapter life cycle is tied with the Cache. So its better to make all cache level artifacts as
    * instance variable
-   *
-   * @param cache gemfire cache
    */
-  protected void handleCacheCreation(InternalCache cache) throws ManagementException {
+  void handleCacheCreation(InternalCache cache) throws ManagementException {
     try {
-      this.internalCache = cache;
-      this.service =
-          (SystemManagementService) ManagementService.getManagementService(internalCache);
+      internalCache = cache;
+      service = (SystemManagementService) ManagementService.getManagementService(internalCache);
 
-      this.memberMBeanBridge = new MemberMBeanBridge(internalCache, service).init();
-      this.memberBean = new MemberMBean(memberMBeanBridge);
-      this.memberLevelNotifEmitter = memberBean;
+      memberMBeanBridge = new MemberMBeanBridge(internalCache, service).init();
+      memberBean = new MemberMBean(memberMBeanBridge);
+      memberLevelNotificationEmitter = memberBean;
+      memberSource = MBeanJMXAdapter.getMemberNameOrUniqueId(cache.getMyId());
 
-      ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(
-          InternalDistributedSystem.getConnectedInstance().getDistributedMember());
+      // Type casting to MemberMXBean to expose only those methods described in interface
+      ObjectName objectName = MBeanJMXAdapter.getMemberMBeanName(cache.getMyId());
+      ObjectName federatedName = service.registerInternalMBean(memberBean, objectName);
+      service.federate(federatedName, MemberMXBean.class, true);
 
-      memberSource = MBeanJMXAdapter
-          .getMemberNameOrUniqueId(internalCache.getDistributedSystem().getDistributedMember());
+      serviceInitialised = true;
 
-      // Type casting to MemberMXBean to expose only those methods described in
-      // the interface;
-      ObjectName changedMBeanName = service.registerInternalMBean(memberBean, memberMBeanName);
-      service.federate(changedMBeanName, MemberMXBean.class, true);
-
-      this.serviceInitialised = true;
-
-      // Service initialised is only for ManagementService and not necessarily
-      // Manager service.
+      // Service initialised is only for ManagementService and not necessarily Manager service.
 
       // For situations where locator is created before any cache is created
       if (InternalLocator.hasLocator()) {
-        Locator loc = InternalLocator.getLocator();
-        handleLocatorStart(loc);
+        handleLocatorStart(InternalLocator.getLocator());
       }
 
       if (cache.getInternalDistributedSystem().getConfig().getJmxManager()) {
-        this.service.createManager();
+        service.createManager();
         if (cache.getInternalDistributedSystem().getConfig().getJmxManagerStart()) {
-          this.service.startManager();
+          service.startManager();
         }
       }
 
@@ -201,48 +217,50 @@
   /**
    * Handles all the distributed mbean creation part when a Manager is started
    */
-  protected void handleManagerStart() throws ManagementException {
+  void handleManagerStart() throws ManagementException {
     if (!isServiceInitialised("handleManagerStart")) {
       return;
     }
-    MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
-    Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
 
-    DistributedSystemBridge dsBridge = new DistributedSystemBridge(service, internalCache);
-    this.aggregator = new MBeanAggregator(dsBridge);
+    DistributedSystemBridge distributedSystemBridge =
+        new DistributedSystemBridge(service, internalCache);
+    aggregator = new MBeanAggregator(distributedSystemBridge);
+
     // register the aggregator for Federation framework to use
     service.addProxyListener(aggregator);
 
-    /*
-     * get the local member mbean as it need to be provided to aggregator first
-     */
+    // get the local member mbean as it need to be provided to aggregator first
+    MemberMXBean localMemberMXBean = service.getMemberMXBean();
 
-    MemberMXBean localMember = service.getMemberMXBean();
     ObjectName memberObjectName = MBeanJMXAdapter.getMemberMBeanName(
         InternalDistributedSystem.getConnectedInstance().getDistributedMember());
 
-    FederationComponent addedComp =
+    FederationComponent memberFederation =
         service.getLocalManager().getFedComponents().get(memberObjectName);
 
-    service.afterCreateProxy(memberObjectName, MemberMXBean.class, localMember, addedComp);
+    service.afterCreateProxy(memberObjectName, MemberMXBean.class, localMemberMXBean,
+        memberFederation);
+
+    MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
+    Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
 
     for (ObjectName objectName : registeredMBeans.keySet()) {
       if (objectName.equals(memberObjectName)) {
         continue;
       }
       Object object = registeredMBeans.get(objectName);
-      ObjectInstance instance;
       try {
-        instance = mbeanServer.getObjectInstance(objectName);
+        ObjectInstance instance = mbeanServer.getObjectInstance(objectName);
         String className = instance.getClassName();
-        Class cls = ClassLoadUtil.classFromName(className);
-        Type[] intfTyps = cls.getGenericInterfaces();
+        Class clazz = ClassLoadUtil.classFromName(className);
+        Type[] interfaceTypes = clazz.getGenericInterfaces();
 
-        FederationComponent newObj = service.getLocalManager().getFedComponents().get(objectName);
+        FederationComponent federation =
+            service.getLocalManager().getFedComponents().get(objectName);
 
-        for (Type intfTyp1 : intfTyps) {
-          Class intfTyp = (Class) intfTyp1;
-          service.afterCreateProxy(objectName, intfTyp, object, newObj);
+        for (Type interfaceType : interfaceTypes) {
+          Class interfaceTypeAsClass = (Class) interfaceType;
+          service.afterCreateProxy(objectName, interfaceTypeAsClass, object, federation);
 
         }
       } catch (InstanceNotFoundException e) {
@@ -263,75 +281,69 @@
    * Handles all the clean up activities when a Manager is stopped It clears the distributed mbeans
    * and underlying data structures
    */
-  protected void handleManagerStop() throws ManagementException {
+  void handleManagerStop() throws ManagementException {
     if (!isServiceInitialised("handleManagerStop")) {
       return;
     }
-    MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
-    Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
 
-    ObjectName aggregatemMBeanPattern;
-    try {
-      aggregatemMBeanPattern = new ObjectName(ManagementConstants.AGGREGATE_MBEAN_PATTERN);
-    } catch (MalformedObjectNameException | NullPointerException e1) {
-      throw new ManagementException(e1);
-    }
-
-    MemberMXBean localMember = service.getMemberMXBean();
+    MemberMXBean localMemberMXBean = service.getMemberMXBean();
 
     ObjectName memberObjectName = MBeanJMXAdapter.getMemberMBeanName(
         InternalDistributedSystem.getConnectedInstance().getDistributedMember());
 
-    FederationComponent removedComp =
+    FederationComponent memberFederation =
         service.getLocalManager().getFedComponents().get(memberObjectName);
 
-    service.afterRemoveProxy(memberObjectName, MemberMXBean.class, localMember, removedComp);
+    service.afterRemoveProxy(memberObjectName, MemberMXBean.class, localMemberMXBean,
+        memberFederation);
 
+    ObjectName aggregateMBeanPattern = aggregateMBeanPattern();
+    MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
+    Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
     for (ObjectName objectName : registeredMBeans.keySet()) {
       if (objectName.equals(memberObjectName)) {
         continue;
       }
-      if (aggregatemMBeanPattern.apply(objectName)) {
+      if (aggregateMBeanPattern.apply(objectName)) {
         continue;
       }
       Object object = registeredMBeans.get(objectName);
-      ObjectInstance instance;
       try {
-        instance = mbeanServer.getObjectInstance(objectName);
+        ObjectInstance instance = mbeanServer.getObjectInstance(objectName);
         String className = instance.getClassName();
-        Class cls = ClassLoadUtil.classFromName(className);
-        Type[] intfTyps = cls.getGenericInterfaces();
+        Class clazz = ClassLoadUtil.classFromName(className);
+        Type[] interfaceTypes = clazz.getGenericInterfaces();
 
-        FederationComponent oldObj = service.getLocalManager().getFedComponents().get(objectName);
+        FederationComponent federation =
+            service.getLocalManager().getFedComponents().get(objectName);
 
-        for (Type intfTyp1 : intfTyps) {
-          Class intfTyp = (Class) intfTyp1;
-          service.afterRemoveProxy(objectName, intfTyp, object, oldObj);
+        for (Type interfaceType : interfaceTypes) {
+          Class interfaceTypeClass = (Class) interfaceType;
+          service.afterRemoveProxy(objectName, interfaceTypeClass, object, federation);
         }
       } catch (InstanceNotFoundException | ClassNotFoundException e) {
         logger.warn("Failed to invoke aggregator for {} with exception {}", objectName,
             e.getMessage(), e);
       }
     }
-    service.removeProxyListener(this.aggregator);
-    this.aggregator = null;
+
+    service.removeProxyListener(aggregator);
+    aggregator = null;
   }
 
   /**
    * Assumption is always cache and MemberMbean has been will be created first
    */
-  protected void handleManagerCreation() throws ManagementException {
+  void handleManagerCreation() throws ManagementException {
     if (!isServiceInitialised("handleManagerCreation")) {
       return;
     }
 
-    ObjectName managerMBeanName = MBeanJMXAdapter.getManagerName();
+    ObjectName objectName = MBeanJMXAdapter.getManagerName();
+    ManagerMBeanBridge managerMBeanBridge = new ManagerMBeanBridge(service);
+    ManagerMXBean managerMXBean = new ManagerMBean(managerMBeanBridge);
 
-    ManagerMBeanBridge bridge = new ManagerMBeanBridge(service);
-
-    ManagerMXBean bean = new ManagerMBean(bridge);
-
-    service.registerInternalMBean(bean, managerMBeanName);
+    service.registerInternalMBean(managerMXBean, objectName);
   }
 
   /**
@@ -340,87 +352,85 @@
    *
    * @param region the region for which the call back is invoked
    */
-  public <K, V> void handleRegionCreation(Region<K, V> region) throws ManagementException {
+  <K, V> void handleRegionCreation(Region<K, V> region) throws ManagementException {
     if (!isServiceInitialised("handleRegionCreation")) {
       return;
     }
+
     // Moving region creation operation inside a guarded block
     // After getting access to regionOpLock it again checks for region
     // destroy status
 
     synchronized (regionOpLock) {
-      LocalRegion localRegion = (LocalRegion) region;
-      if (localRegion.isDestroyed()) {
+      if (region.isDestroyed()) {
         return;
       }
+
       // Bridge is responsible for extracting data from GemFire Layer
-      RegionMBeanBridge<K, V> bridge = RegionMBeanBridge.getInstance(region);
-
-      RegionMXBean regionMBean = new RegionMBean<>(bridge);
-      ObjectName regionMBeanName = MBeanJMXAdapter.getRegionMBeanName(
+      RegionMBeanBridge<K, V> regionMBeanBridge = RegionMBeanBridge.getInstance(region);
+      RegionMXBean regionMXBean = new RegionMBean<>(regionMBeanBridge);
+      ObjectName objectName = MBeanJMXAdapter.getRegionMBeanName(
           internalCache.getDistributedSystem().getDistributedMember(), region.getFullPath());
-      ObjectName changedMBeanName = service.registerInternalMBean(regionMBean, regionMBeanName);
-      service.federate(changedMBeanName, RegionMXBean.class, true);
+      ObjectName federatedName = service.registerInternalMBean(regionMXBean, objectName);
+      service.federate(federatedName, RegionMXBean.class, true);
 
-      Notification notification = new Notification(JMXNotificationType.REGION_CREATED, memberSource,
+      Notification notification = new Notification(REGION_CREATED, memberSource,
           SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.REGION_CREATED_PREFIX + region.getFullPath());
-      memberLevelNotifEmitter.sendNotification(notification);
+          REGION_CREATED_PREFIX + region.getFullPath());
+      memberLevelNotificationEmitter.sendNotification(notification);
+
       memberMBeanBridge.addRegion(region);
     }
   }
 
   /**
    * Handles Disk Creation. Will create DiskStoreMXBean and will send a notification
-   *
-   * @param disk the disk store for which the call back is invoked
    */
-  protected void handleDiskCreation(DiskStore disk) throws ManagementException {
+  void handleDiskCreation(DiskStore diskStore) throws ManagementException {
     if (!isServiceInitialised("handleDiskCreation")) {
       return;
     }
-    DiskStoreMBeanBridge bridge = new DiskStoreMBeanBridge(disk);
-    DiskStoreMXBean diskStoreMBean = new DiskStoreMBean(bridge);
-    ObjectName diskStoreMBeanName = MBeanJMXAdapter.getDiskStoreMBeanName(
-        internalCache.getDistributedSystem().getDistributedMember(), disk.getName());
-    ObjectName changedMBeanName = service.registerInternalMBean(diskStoreMBean, diskStoreMBeanName);
 
-    service.federate(changedMBeanName, DiskStoreMXBean.class, true);
+    DiskStoreMBeanBridge diskStoreMBeanBridge = new DiskStoreMBeanBridge(diskStore);
+    DiskStoreMXBean diskStoreMXBean = new DiskStoreMBean(diskStoreMBeanBridge);
+    ObjectName objectName = MBeanJMXAdapter.getDiskStoreMBeanName(
+        internalCache.getDistributedSystem().getDistributedMember(), diskStore.getName());
+    ObjectName federatedName = service.registerInternalMBean(diskStoreMXBean, objectName);
+    service.federate(federatedName, DiskStoreMXBean.class, true);
 
-    Notification notification = new Notification(JMXNotificationType.DISK_STORE_CREATED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.DISK_STORE_CREATED_PREFIX + disk.getName());
-    memberLevelNotifEmitter.sendNotification(notification);
-    memberMBeanBridge.addDiskStore(disk);
+    Notification notification = new Notification(DISK_STORE_CREATED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        DISK_STORE_CREATED_PREFIX + diskStore.getName());
+    memberLevelNotificationEmitter.sendNotification(notification);
+
+    memberMBeanBridge.addDiskStore(diskStore);
   }
 
   /**
    * Handles LockService Creation
    *
    */
-  protected void handleLockServiceCreation(DLockService lockService) throws ManagementException {
+  void handleLockServiceCreation(DLockService lockService) throws ManagementException {
     if (!isServiceInitialised("handleLockServiceCreation")) {
       return;
     }
     // Internal Locks Should not be exposed to client for monitoring
-    if (internalLocks.contains(lockService.getName())) {
+    if (INTERNAL_LOCK_SERVICES.contains(lockService.getName())) {
       return;
     }
-    LockServiceMBeanBridge bridge = new LockServiceMBeanBridge(lockService);
-    LockServiceMXBean lockServiceMBean = new LockServiceMBean(bridge);
 
-    ObjectName lockServiceMBeanName = MBeanJMXAdapter.getLockServiceMBeanName(
+    LockServiceMBeanBridge lockServiceMBeanBridge = new LockServiceMBeanBridge(lockService);
+    LockServiceMXBean lockServiceMXBean = new LockServiceMBean(lockServiceMBeanBridge);
+    ObjectName objectName = MBeanJMXAdapter.getLockServiceMBeanName(
         internalCache.getDistributedSystem().getDistributedMember(), lockService.getName());
+    ObjectName federatedName =
+        service.registerInternalMBean(lockServiceMXBean, objectName);
+    service.federate(federatedName, LockServiceMXBean.class, true);
 
-    ObjectName changedMBeanName =
-        service.registerInternalMBean(lockServiceMBean, lockServiceMBeanName);
-
-    service.federate(changedMBeanName, LockServiceMXBean.class, true);
-
-    Notification notification = new Notification(JMXNotificationType.LOCK_SERVICE_CREATED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.LOCK_SERVICE_CREATED_PREFIX + lockService.getName());
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(LOCK_SERVICE_CREATED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        LOCK_SERVICE_CREATED_PREFIX + lockService.getName());
+    memberLevelNotificationEmitter.sendNotification(notification);
 
     memberMBeanBridge.addLockServiceStats(lockService);
   }
@@ -430,165 +440,151 @@
    *
    * @param sender the specific gateway sender
    */
-  protected void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderCreation")) {
       return;
     }
-    GatewaySenderMBeanBridge bridge = new GatewaySenderMBeanBridge(sender);
 
-    GatewaySenderMXBean senderMBean = new GatewaySenderMBean(bridge);
-    ObjectName senderObjectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
+    GatewaySenderMBeanBridge gatewaySenderMBeanBridge = new GatewaySenderMBeanBridge(sender);
+    GatewaySenderMXBean gatewaySenderMXBean = new GatewaySenderMBean(gatewaySenderMBeanBridge);
+    ObjectName objectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
         internalCache.getDistributedSystem().getDistributedMember(), sender.getId());
+    ObjectName federatedName = service.registerInternalMBean(gatewaySenderMXBean, objectName);
+    service.federate(federatedName, GatewaySenderMXBean.class, true);
 
-    ObjectName changedMBeanName = service.registerInternalMBean(senderMBean, senderObjectName);
-
-    service.federate(changedMBeanName, GatewaySenderMXBean.class, true);
-
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_CREATED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_CREATED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_CREATED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_SENDER_CREATED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
   /**
    * Handles Gateway receiver creation
    *
-   * @param recv specific gateway receiver
+   * @param gatewayReceiver specific gateway receiver
    */
-  protected void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException {
+  void handleGatewayReceiverCreate(GatewayReceiver gatewayReceiver) throws ManagementException {
     if (!isServiceInitialised("handleGatewayReceiverCreate")) {
       return;
     }
-    if (!recv.isManualStart()) {
+    if (!gatewayReceiver.isManualStart()) {
       return;
     }
 
-    createGatewayReceiverMBean(recv);
+    createGatewayReceiverMBean(gatewayReceiver);
   }
 
-  private void createGatewayReceiverMBean(GatewayReceiver recv) {
-    GatewayReceiverMBeanBridge bridge = new GatewayReceiverMBeanBridge(recv);
-
-    GatewayReceiverMXBean receiverMBean = new GatewayReceiverMBean(bridge);
-    ObjectName recvObjectName = MBeanJMXAdapter
+  private void createGatewayReceiverMBean(GatewayReceiver gatewayReceiver) {
+    GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge =
+        new GatewayReceiverMBeanBridge(gatewayReceiver);
+    GatewayReceiverMXBean gatewayReceiverMXBean =
+        new GatewayReceiverMBean(gatewayReceiverMBeanBridge);
+    ObjectName objectName = MBeanJMXAdapter
         .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember());
+    ObjectName federatedName = service.registerInternalMBean(gatewayReceiverMXBean, objectName);
+    service.federate(federatedName, GatewayReceiverMXBean.class, true);
 
-    ObjectName changedMBeanName = service.registerInternalMBean(receiverMBean, recvObjectName);
-
-    service.federate(changedMBeanName, GatewayReceiverMXBean.class, true);
-
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_CREATED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_RECEIVER_CREATED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_RECEIVER_CREATED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_CREATED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  /**
-   * Handles Gateway receiver destroy
-   *
-   * @param recv specific gateway receiver
-   */
-  protected void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException {
+  void handleGatewayReceiverDestroy() throws ManagementException {
     if (!isServiceInitialised("handleGatewayReceiverDestroy")) {
       return;
     }
 
-    GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
-    GatewayReceiverMBeanBridge bridge = mbean.getBridge();
+    GatewayReceiverMBean gatewayReceiverMBean =
+        (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+    GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
 
-    bridge.destroyServer();
-    ObjectName objectName = (MBeanJMXAdapter
-        .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember()));
+    gatewayReceiverMBeanBridge.destroyServer();
 
+    ObjectName objectName = MBeanJMXAdapter
+        .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember());
     service.unregisterMBean(objectName);
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_DESTROYED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+
+    Notification notification = new Notification(GATEWAY_RECEIVER_DESTROYED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_DESTROYED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
   /**
    * Handles Gateway receiver creation
    *
-   * @param recv specific gateway receiver
+   * @param gatewayReceiver specific gateway receiver
    */
-  protected void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException {
+  void handleGatewayReceiverStart(GatewayReceiver gatewayReceiver) throws ManagementException {
     if (!isServiceInitialised("handleGatewayReceiverStart")) {
       return;
     }
 
-    if (!recv.isManualStart()) {
-      createGatewayReceiverMBean(recv);
+    if (!gatewayReceiver.isManualStart()) {
+      createGatewayReceiverMBean(gatewayReceiver);
     }
 
-    GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
-    GatewayReceiverMBeanBridge bridge = mbean.getBridge();
+    GatewayReceiverMBean gatewayReceiverMBean =
+        (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+    GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
 
-    bridge.startServer();
+    gatewayReceiverMBeanBridge.startServer();
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_STARTED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_RECEIVER_STARTED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_RECEIVER_STARTED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_STARTED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  /**
-   * Handles Gateway receiver creation
-   *
-   * @param recv specific gateway receiver
-   */
-  protected void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException {
+  void handleGatewayReceiverStop() throws ManagementException {
     if (!isServiceInitialised("handleGatewayReceiverStop")) {
       return;
     }
-    GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
-    GatewayReceiverMBeanBridge bridge = mbean.getBridge();
 
-    bridge.stopServer();
+    GatewayReceiverMBean gatewayReceiverMBean =
+        (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+    GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_STOPPED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_RECEIVER_STOPPED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+    gatewayReceiverMBeanBridge.stopServer();
+
+    Notification notification = new Notification(GATEWAY_RECEIVER_STOPPED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_STOPPED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException {
+  void handleAsyncEventQueueCreation(AsyncEventQueue asyncEventQueue) throws ManagementException {
     if (!isServiceInitialised("handleAsyncEventQueueCreation")) {
       return;
     }
-    AsyncEventQueueMBeanBridge bridge = new AsyncEventQueueMBeanBridge(queue);
-    AsyncEventQueueMXBean queueMBean = new AsyncEventQueueMBean(bridge);
-    ObjectName senderObjectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
-        internalCache.getDistributedSystem().getDistributedMember(), queue.getId());
 
-    ObjectName changedMBeanName = service.registerInternalMBean(queueMBean, senderObjectName);
+    AsyncEventQueueMBeanBridge asyncEventQueueMBeanBridge =
+        new AsyncEventQueueMBeanBridge(asyncEventQueue);
+    AsyncEventQueueMXBean asyncEventQueueMXBean =
+        new AsyncEventQueueMBean(asyncEventQueueMBeanBridge);
+    ObjectName objectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
+        internalCache.getDistributedSystem().getDistributedMember(), asyncEventQueue.getId());
+    ObjectName federatedName = service.registerInternalMBean(asyncEventQueueMXBean, objectName);
+    service.federate(federatedName, AsyncEventQueueMXBean.class, true);
 
-    service.federate(changedMBeanName, AsyncEventQueueMXBean.class, true);
-
-    Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CREATED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.ASYNC_EVENT_QUEUE_CREATED_PREFIX);
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(ASYNC_EVENT_QUEUE_CREATED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), ASYNC_EVENT_QUEUE_CREATED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  /**
-   * Handles AsyncEventQueue Removal
-   *
-   * @param queue The AsyncEventQueue being removed
-   */
-  protected void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
+  void handleAsyncEventQueueRemoval(AsyncEventQueue asyncEventQueue) throws ManagementException {
     if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
       return;
     }
 
-    ObjectName asycnEventQueueMBeanName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
-        internalCache.getDistributedSystem().getDistributedMember(), queue.getId());
-    AsyncEventQueueMBean bean;
+    ObjectName objectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
+        internalCache.getDistributedSystem().getDistributedMember(), asyncEventQueue.getId());
+
     try {
-      bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(queue.getId());
-      if (bean == null) {
+      AsyncEventQueueMBean asyncEventQueueMBean =
+          (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(asyncEventQueue.getId());
+      if (asyncEventQueueMBean == null) {
         return;
       }
+
+      asyncEventQueueMBean.stopMonitor();
+
     } catch (ManagementException e) {
       // If no bean found its a NO-OP
       if (logger.isDebugEnabled()) {
@@ -597,55 +593,39 @@
       return;
     }
 
-    bean.stopMonitor();
+    service.unregisterMBean(objectName);
 
-    service.unregisterMBean(asycnEventQueueMBeanName);
-
-    Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX + queue.getId());
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(ASYNC_EVENT_QUEUE_CLOSED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        ASYNC_EVENT_QUEUE_CLOSED_PREFIX + asyncEventQueue.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
   /**
    * Sends the alert with the Object source as member. This notification will get filtered out for
    * particular alert level
-   *
    */
-  protected void handleSystemNotification(AlertDetails details) {
+  void handleSystemNotification(AlertDetails alertDetails) {
     if (!isServiceInitialised("handleSystemNotification")) {
       return;
     }
+
     if (service.isManager()) {
       String systemSource = "DistributedSystem("
           + service.getDistributedSystemMXBean().getDistributedSystemId() + ")";
-      Map<String, String> userData = prepareUserData(details);
-
-
-      Notification notification = new Notification(JMXNotificationType.SYSTEM_ALERT, systemSource,
-          SequenceNumber.next(), details.getMsgTime().getTime(), details.getMsg());
-
-      notification.setUserData(userData);
+      Notification notification = new Notification(SYSTEM_ALERT, systemSource,
+          SequenceNumber.next(), alertDetails.getMsgTime().getTime(), alertDetails.getMsg());
+      notification.setUserData(prepareUserData(alertDetails));
       service.handleNotification(notification);
     }
   }
 
-  private Map<String, String> prepareUserData(AlertDetails details) {
+  private Map<String, String> prepareUserData(AlertDetails alertDetails) {
     Map<String, String> userData = new HashMap<>();
-    userData.put(JMXNotificationUserData.ALERT_LEVEL,
-        AlertDetails.getAlertLevelAsString(details.getAlertLevel()));
 
-    String source = details.getSource();
-    userData.put(JMXNotificationUserData.THREAD, source);
-
-    InternalDistributedMember sender = details.getSender();
-    String nameOrId = memberSource;
-    if (sender != null) {
-      nameOrId = sender.getName();
-      nameOrId = StringUtils.isNotBlank(nameOrId) ? nameOrId : sender.getId();
-    }
-
-    userData.put(JMXNotificationUserData.MEMBER, nameOrId);
+    userData.put(ALERT_LEVEL, getAlertLevelAsString(alertDetails.getAlertLevel()));
+    userData.put(THREAD, alertDetails.getSource());
+    userData.put(MEMBER, getNameOrId(alertDetails.getSender()));
 
     return userData;
   }
@@ -655,83 +635,68 @@
    *
    * @param cacheServer cache server instance
    */
-  protected void handleCacheServerStart(CacheServer cacheServer) {
+  void handleCacheServerStart(CacheServer cacheServer) {
     if (!isServiceInitialised("handleCacheServerStart")) {
       return;
     }
 
     CacheServerBridge cacheServerBridge = new CacheServerBridge(internalCache, cacheServer);
     cacheServerBridge.setMemberMBeanBridge(memberMBeanBridge);
-
     CacheServerMBean cacheServerMBean = new CacheServerMBean(cacheServerBridge);
-
-    ObjectName cacheServerMBeanName = MBeanJMXAdapter.getClientServiceMBeanName(
+    ObjectName objectName = MBeanJMXAdapter.getClientServiceMBeanName(
         cacheServer.getPort(), internalCache.getDistributedSystem().getDistributedMember());
-
-    ObjectName changedMBeanName =
-        service.registerInternalMBean(cacheServerMBean, cacheServerMBeanName);
+    ObjectName federatedName = service.registerInternalMBean(cacheServerMBean, objectName);
 
     ClientMembershipListener managementClientListener = new CacheServerMembershipListenerAdapter(
-        cacheServerMBean, memberLevelNotifEmitter, changedMBeanName);
+        cacheServerMBean, memberLevelNotificationEmitter, federatedName);
     ClientMembership.registerClientMembershipListener(managementClientListener);
-
     cacheServerBridge.setClientMembershipListener(managementClientListener);
 
-    service.federate(changedMBeanName, CacheServerMXBean.class, true);
+    service.federate(federatedName, CacheServerMXBean.class, true);
 
-    Notification notification = new Notification(JMXNotificationType.CACHE_SERVER_STARTED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.CACHE_SERVER_STARTED_PREFIX);
-
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(CACHE_SERVER_STARTED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), CACHE_SERVER_STARTED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
 
     memberMBeanBridge.setCacheServer(true);
   }
 
   /**
    * Assumption is its a cache server instance. For Gateway receiver there will be a separate method
-   *
-   * @param server cache server instance
    */
-  protected void handleCacheServerStop(CacheServer server) {
+  void handleCacheServerStop(CacheServer server) {
     if (!isServiceInitialised("handleCacheServerStop")) {
       return;
     }
 
-    CacheServerMBean mbean = (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
+    CacheServerMBean cacheServerMBean =
+        (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
 
-    ClientMembershipListener listener = mbean.getBridge().getClientMembershipListener();
-
-    if (listener != null) {
-      ClientMembership.unregisterClientMembershipListener(listener);
+    ClientMembershipListener clientMembershipListener =
+        cacheServerMBean.getBridge().getClientMembershipListener();
+    if (clientMembershipListener != null) {
+      ClientMembership.unregisterClientMembershipListener(clientMembershipListener);
     }
 
-    mbean.stopMonitor();
+    cacheServerMBean.stopMonitor();
 
-    ObjectName cacheServerMBeanName = MBeanJMXAdapter.getClientServiceMBeanName(server.getPort(),
+    ObjectName objectName = MBeanJMXAdapter.getClientServiceMBeanName(server.getPort(),
         internalCache.getDistributedSystem().getDistributedMember());
-    service.unregisterMBean(cacheServerMBeanName);
+    service.unregisterMBean(objectName);
 
-    Notification notification = new Notification(JMXNotificationType.CACHE_SERVER_STOPPED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.CACHE_SERVER_STOPPED_PREFIX);
-
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(CACHE_SERVER_STOPPED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), CACHE_SERVER_STOPPED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
 
     memberMBeanBridge.setCacheServer(false);
   }
 
-  /**
-   * Handles Cache removal. It will automatically remove all MBeans from GemFire Domain
-   *
-   * @param cache GemFire Cache instance. For now client cache is not supported
-   */
-  protected void handleCacheRemoval(Cache cache) throws ManagementException {
+  void handleCacheRemoval() throws ManagementException {
     if (!isServiceInitialised("handleCacheRemoval")) {
       return;
     }
 
-    this.serviceInitialised = false;
+    serviceInitialised = false;
     try {
       cleanUpMonitors();
       cleanBridgeResources();
@@ -746,48 +711,57 @@
     } catch (Exception e) {
       logger.warn(e.getMessage(), e);
     } finally {
-      this.internalCache = null;
-      this.service = null;
-      this.memberMBeanBridge = null;
-      this.memberBean = null;
-      this.memberLevelNotifEmitter = null;
+      internalCache = null;
+      service = null;
+      memberMBeanBridge = null;
+      memberBean = null;
+      memberLevelNotificationEmitter = null;
     }
   }
 
+  private String getNameOrId(DistributedMember member) {
+    if (member == null) {
+      return memberSource;
+    }
+    return isNotBlank(member.getName()) ? member.getName() : member.getId();
+  }
+
   private void cleanUpMonitors() {
-    MemberMBean bean = (MemberMBean) service.getMemberMXBean();
-    if (bean != null) {
-      bean.stopMonitor();
+    MemberMBean memberMBean = (MemberMBean) service.getMemberMXBean();
+    if (memberMBean != null) {
+      memberMBean.stopMonitor();
     }
 
-    Set<GatewaySender> senders = internalCache.getGatewaySenders();
+    Set<GatewaySender> gatewaySenders = internalCache.getGatewaySenders();
 
-    if (senders != null && senders.size() > 0) {
-      for (GatewaySender sender : senders) {
-        GatewaySenderMBean senderMBean =
-            (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
-        if (senderMBean != null) {
-          senderMBean.stopMonitor();
+    if (gatewaySenders != null && !gatewaySenders.isEmpty()) {
+      for (GatewaySender gatewaySender : gatewaySenders) {
+        GatewaySenderMBean gatewaySenderMBean =
+            (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
+        if (gatewaySenderMBean != null) {
+          gatewaySenderMBean.stopMonitor();
         }
       }
     }
 
-    GatewayReceiverMBean receiver = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
-    if (receiver != null) {
-      receiver.stopMonitor();
+    GatewayReceiverMBean gatewayReceiverMBean =
+        (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+    if (gatewayReceiverMBean != null) {
+      gatewayReceiverMBean.stopMonitor();
     }
   }
 
   private void cleanBridgeResources() {
-    List<CacheServer> servers = internalCache.getCacheServers();
+    List<CacheServer> cacheServers = internalCache.getCacheServers();
 
-    if (servers != null && servers.size() > 0) {
-      for (CacheServer server : servers) {
-        CacheServerMBean mbean =
-            (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
+    if (cacheServers != null && !cacheServers.isEmpty()) {
+      for (CacheServer cacheServer : cacheServers) {
+        CacheServerMBean cacheServerMBean =
+            (CacheServerMBean) service.getLocalCacheServerMXBean(cacheServer.getPort());
 
-        if (mbean != null) {
-          ClientMembershipListener listener = mbean.getBridge().getClientMembershipListener();
+        if (cacheServerMBean != null) {
+          ClientMembershipListener listener =
+              cacheServerMBean.getBridge().getClientMembershipListener();
 
           if (listener != null) {
             ClientMembership.unregisterClientMembershipListener(listener);
@@ -799,23 +773,24 @@
 
   /**
    * Handles particular region destroy or close operation it will remove the corresponding MBean
-   *
    */
-  protected void handleRegionRemoval(Region region) throws ManagementException {
+  void handleRegionRemoval(Region region) throws ManagementException {
     if (!isServiceInitialised("handleRegionRemoval")) {
       return;
     }
-    /*
-     * Moved region remove operation to a guarded block. If a region is getting created it wont
-     * allow it to destroy any region.
-     */
+
+    // Moved region remove operation to a guarded block. If a region is getting created it won't
+    // allow it to destroy any region.
 
     synchronized (regionOpLock) {
-      ObjectName regionMBeanName = MBeanJMXAdapter.getRegionMBeanName(
+      ObjectName objectName = MBeanJMXAdapter.getRegionMBeanName(
           internalCache.getDistributedSystem().getDistributedMember(), region.getFullPath());
-      RegionMBean bean;
+
       try {
-        bean = (RegionMBean) service.getLocalRegionMBean(region.getFullPath());
+        RegionMBean regionMBean = (RegionMBean) service.getLocalRegionMBean(region.getFullPath());
+        if (regionMBean != null) {
+          regionMBean.stopMonitor();
+        }
       } catch (ManagementException e) {
         // If no bean found its a NO-OP
         // Mostly for situation like DiskAccessException while creating region
@@ -826,37 +801,32 @@
         return;
       }
 
-      if (bean != null) {
-        bean.stopMonitor();
-      }
-      service.unregisterMBean(regionMBeanName);
+      service.unregisterMBean(objectName);
 
-      Notification notification = new Notification(JMXNotificationType.REGION_CLOSED, memberSource,
+      Notification notification = new Notification(REGION_CLOSED, memberSource,
           SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.REGION_CLOSED_PREFIX + region.getFullPath());
-      memberLevelNotifEmitter.sendNotification(notification);
+          REGION_CLOSED_PREFIX + region.getFullPath());
+      memberLevelNotificationEmitter.sendNotification(notification);
+
       memberMBeanBridge.removeRegion(region);
     }
   }
 
-  /**
-   * Handles DiskStore Removal
-   *
-   */
-  protected void handleDiskRemoval(DiskStore disk) throws ManagementException {
+  void handleDiskRemoval(DiskStore diskStore) throws ManagementException {
     if (!isServiceInitialised("handleDiskRemoval")) {
       return;
     }
 
-    ObjectName diskStoreMBeanName = MBeanJMXAdapter.getDiskStoreMBeanName(
-        internalCache.getDistributedSystem().getDistributedMember(), disk.getName());
+    ObjectName objectName = MBeanJMXAdapter.getDiskStoreMBeanName(
+        internalCache.getDistributedSystem().getDistributedMember(), diskStore.getName());
 
-    DiskStoreMBean bean;
     try {
-      bean = (DiskStoreMBean) service.getLocalDiskStoreMBean(disk.getName());
-      if (bean == null) {
+      DiskStoreMBean diskStoreMBean =
+          (DiskStoreMBean) service.getLocalDiskStoreMBean(diskStore.getName());
+      if (diskStoreMBean == null) {
         return;
       }
+      diskStoreMBean.stopMonitor();
     } catch (ManagementException e) {
       // If no bean found its a NO-OP
       if (logger.isDebugEnabled()) {
@@ -865,192 +835,193 @@
       return;
     }
 
-    bean.stopMonitor();
+    service.unregisterMBean(objectName);
 
-    service.unregisterMBean(diskStoreMBeanName);
+    Notification notification = new Notification(DISK_STORE_CLOSED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        DISK_STORE_CLOSED_PREFIX + diskStore.getName());
+    memberLevelNotificationEmitter.sendNotification(notification);
 
-    Notification notification = new Notification(JMXNotificationType.DISK_STORE_CLOSED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.DISK_STORE_CLOSED_PREFIX + disk.getName());
-    memberLevelNotifEmitter.sendNotification(notification);
-    memberMBeanBridge.removeDiskStore(disk);
+    memberMBeanBridge.removeDiskStore(diskStore);
   }
 
-  /**
-   * Handles Lock Service Removal
-   *
-   * @param lockService lock service instance
-   */
-  protected void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
+  void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
     if (!isServiceInitialised("handleLockServiceRemoval")) {
       return;
     }
 
-    ObjectName lockServiceMBeanName = MBeanJMXAdapter.getLockServiceMBeanName(
+    ObjectName objectName = MBeanJMXAdapter.getLockServiceMBeanName(
         internalCache.getDistributedSystem().getDistributedMember(), lockService.getName());
 
-    LockServiceMXBean bean = service.getLocalLockServiceMBean(lockService.getName());
+    service.unregisterMBean(objectName);
 
-    service.unregisterMBean(lockServiceMBeanName);
-
-    Notification notification = new Notification(JMXNotificationType.LOCK_SERVICE_CLOSED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.LOCK_SERVICE_CLOSED_PREFIX + lockService.getName());
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(LOCK_SERVICE_CLOSED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        LOCK_SERVICE_CLOSED_PREFIX + lockService.getName());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
   /**
    * Handles management side call backs for a locator creation and start. Assumption is a cache will
    * be created before hand.
    *
+   * <p>
    * There is no corresponding handleStopLocator() method. Locator will close the cache whenever its
    * stopped and it should also shutdown all the management services by closing the cache.
    *
    * @param locator instance of locator which is getting started
    */
-  protected void handleLocatorStart(Locator locator) throws ManagementException {
+  void handleLocatorStart(Locator locator) throws ManagementException {
     if (!isServiceInitialised("handleLocatorCreation")) {
       return;
     }
 
-    ObjectName locatorMBeanName = MBeanJMXAdapter
+    ObjectName objectName = MBeanJMXAdapter
         .getLocatorMBeanName(internalCache.getDistributedSystem().getDistributedMember());
+    LocatorMBeanBridge locatorMBeanBridge = new LocatorMBeanBridge(locator);
+    LocatorMBean locatorMBean = new LocatorMBean(locatorMBeanBridge);
+    ObjectName federatedName = service.registerInternalMBean(locatorMBean, objectName);
+    service.federate(federatedName, LocatorMXBean.class, true);
 
-    LocatorMBeanBridge bridge = new LocatorMBeanBridge(locator);
-    LocatorMBean locatorMBean = new LocatorMBean(bridge);
-
-    ObjectName changedMBeanName = service.registerInternalMBean(locatorMBean, locatorMBeanName);
-
-    service.federate(changedMBeanName, LocatorMXBean.class, true);
-
-    Notification notification =
-        new Notification(JMXNotificationType.LOCATOR_STARTED, memberSource, SequenceNumber.next(),
-            System.currentTimeMillis(), ManagementConstants.LOCATOR_STARTED_PREFIX);
-
-    memberLevelNotifEmitter.sendNotification(notification);
-
+    Notification notification = new Notification(LOCATOR_STARTED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(), LOCATOR_STARTED_PREFIX);
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleGatewaySenderStart(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderStart(GatewaySender gatewaySender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderStart")) {
       return;
     }
-    if ((sender.getRemoteDSId() < 0)) {
+    if (gatewaySender.getRemoteDSId() < 0) {
       return;
     }
-    GatewaySenderMBean bean =
-        (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
 
-    bean.getBridge().setDispatcher();
+    GatewaySenderMBean gatewaySenderMBean =
+        (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_STARTED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_STARTED_PREFIX + sender.getId());
+    gatewaySenderMBean.getBridge().setDispatcher();
 
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_STARTED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        GATEWAY_SENDER_STARTED_PREFIX + gatewaySender.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleGatewaySenderStop(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderStop(GatewaySender gatewaySender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderStop")) {
       return;
     }
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_STOPPED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_STOPPED_PREFIX + sender.getId());
-
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_STOPPED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        GATEWAY_SENDER_STOPPED_PREFIX + gatewaySender.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderPaused(GatewaySender gatewaySender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderPaused")) {
       return;
     }
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_PAUSED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_PAUSED_PREFIX + sender.getId());
-
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_PAUSED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        GATEWAY_SENDER_PAUSED_PREFIX + gatewaySender.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderResumed(GatewaySender gatewaySender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderResumed")) {
       return;
     }
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_RESUMED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_RESUMED_PREFIX + sender.getId());
-
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_RESUMED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        GATEWAY_SENDER_RESUMED_PREFIX + gatewaySender.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException {
+  void handleGatewaySenderRemoved(GatewaySender gatewaySender) throws ManagementException {
     if (!isServiceInitialised("handleGatewaySenderRemoved")) {
       return;
     }
-    if ((sender.getRemoteDSId() < 0)) {
+    if (gatewaySender.getRemoteDSId() < 0) {
       return;
     }
 
-    GatewaySenderMBean bean =
-        (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
-    bean.stopMonitor();
+    GatewaySenderMBean gatewaySenderMBean =
+        (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
+    gatewaySenderMBean.stopMonitor();
 
-    ObjectName gatewaySenderName = MBeanJMXAdapter.getGatewaySenderMBeanName(
-        internalCache.getDistributedSystem().getDistributedMember(), sender.getId());
-    service.unregisterMBean(gatewaySenderName);
+    ObjectName objectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
+        internalCache.getDistributedSystem().getDistributedMember(), gatewaySender.getId());
+    service.unregisterMBean(objectName);
 
-    Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_REMOVED,
-        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-        ManagementConstants.GATEWAY_SENDER_REMOVED_PREFIX + sender.getId());
-    memberLevelNotifEmitter.sendNotification(notification);
+    Notification notification = new Notification(GATEWAY_SENDER_REMOVED, memberSource,
+        SequenceNumber.next(), System.currentTimeMillis(),
+        GATEWAY_SENDER_REMOVED_PREFIX + gatewaySender.getId());
+    memberLevelNotificationEmitter.sendNotification(notification);
   }
 
-  protected void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
+  void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
     if (!isServiceInitialised("handleCacheServiceCreation")) {
       return;
     }
+
     // Don't register the CacheServices in the Locator
     InternalDistributedMember member =
         internalCache.getInternalDistributedSystem().getDistributedMember();
     if (member.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
       return;
     }
-    CacheServiceMBeanBase mbean = cacheService.getMBean();
-    if (mbean != null) {
-      String id = mbean.getId();
-      ObjectName cacheServiceObjectName = MBeanJMXAdapter.getCacheServiceMBeanName(member, id);
 
-      ObjectName changedMBeanName = service.registerInternalMBean(mbean, cacheServiceObjectName);
+    CacheServiceMBeanBase cacheServiceMBean = cacheService.getMBean();
+    if (cacheServiceMBean != null) {
+      String id = cacheServiceMBean.getId();
+      ObjectName objectName = MBeanJMXAdapter.getCacheServiceMBeanName(member, id);
+      ObjectName federatedName = service.registerInternalMBean(cacheServiceMBean, objectName);
+      service.federate(federatedName, cacheServiceMBean.getInterfaceClass(), true);
 
-      service.federate(changedMBeanName, mbean.getInterfaceClass(), true);
-
-      Notification notification = new Notification(JMXNotificationType.CACHE_SERVICE_CREATED,
-          memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.CACHE_SERVICE_CREATED_PREFIX + id);
-      memberLevelNotifEmitter.sendNotification(notification);
+      Notification notification = new Notification(CACHE_SERVICE_CREATED, memberSource,
+          SequenceNumber.next(), System.currentTimeMillis(),
+          CACHE_SERVICE_CREATED_PREFIX + id);
+      memberLevelNotificationEmitter.sendNotification(notification);
     }
   }
 
+  private ObjectName aggregateMBeanPattern() {
+    try {
+      return new ObjectName(AGGREGATE_MBEAN_PATTERN);
+    } catch (MalformedObjectNameException | NullPointerException e) {
+      throw new ManagementException(e);
+    }
+  }
+
+  private boolean isServiceInitialised(String method) {
+    if (!serviceInitialised) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Management Service is not initialised hence returning from {}", method);
+      }
+    }
+
+    return serviceInitialised;
+  }
+
   /**
-   * Private class which acts as a ClientMembershipListener to propagate client joined/left
-   * notifications
+   * Propagates client joined/left notifications
    */
   private static class CacheServerMembershipListenerAdapter
       extends ClientMembershipListenerAdapter {
 
-    private NotificationBroadcasterSupport serverLevelNotifEmitter;
-    private NotificationBroadcasterSupport memberLevelNotifEmitter;
+    private final NotificationBroadcasterSupport serverLevelNotificationEmitter;
+    private final NotificationBroadcasterSupport memberLevelNotificationEmitter;
+    private final String serverSource;
 
-    private String serverSource;
-
-    public CacheServerMembershipListenerAdapter(
-        NotificationBroadcasterSupport serverLevelNotifEmitter,
-        NotificationBroadcasterSupport memberLevelNotifEmitter, ObjectName serverSource) {
-      this.serverLevelNotifEmitter = serverLevelNotifEmitter;
-      this.memberLevelNotifEmitter = memberLevelNotifEmitter;
+    private CacheServerMembershipListenerAdapter(
+        NotificationBroadcasterSupport serverLevelNotificationEmitter,
+        NotificationBroadcasterSupport memberLevelNotificationEmitter,
+        ObjectName serverSource) {
+      this.serverLevelNotificationEmitter = serverLevelNotificationEmitter;
+      this.memberLevelNotificationEmitter = memberLevelNotificationEmitter;
       this.serverSource = serverSource.toString();
     }
 
@@ -1060,11 +1031,12 @@
      */
     @Override
     public void memberJoined(ClientMembershipEvent event) {
-      Notification notification = new Notification(JMXNotificationType.CLIENT_JOINED, serverSource,
+      Notification notification = new Notification(CLIENT_JOINED, serverSource,
           SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.CLIENT_JOINED_PREFIX + event.getMemberId());
-      serverLevelNotifEmitter.sendNotification(notification);
-      memberLevelNotifEmitter.sendNotification(notification);
+          CLIENT_JOINED_PREFIX + event.getMemberId());
+
+      serverLevelNotificationEmitter.sendNotification(notification);
+      memberLevelNotificationEmitter.sendNotification(notification);
     }
 
     /**
@@ -1073,11 +1045,12 @@
      */
     @Override
     public void memberLeft(ClientMembershipEvent event) {
-      Notification notification = new Notification(JMXNotificationType.CLIENT_LEFT, serverSource,
+      Notification notification = new Notification(CLIENT_LEFT, serverSource,
           SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.CLIENT_LEFT_PREFIX + event.getMemberId());
-      serverLevelNotifEmitter.sendNotification(notification);
-      memberLevelNotifEmitter.sendNotification(notification);
+          CLIENT_LEFT_PREFIX + event.getMemberId());
+
+      serverLevelNotificationEmitter.sendNotification(notification);
+      memberLevelNotificationEmitter.sendNotification(notification);
     }
 
     /**
@@ -1086,24 +1059,12 @@
      */
     @Override
     public void memberCrashed(ClientMembershipEvent event) {
-      Notification notification = new Notification(JMXNotificationType.CLIENT_CRASHED, serverSource,
+      Notification notification = new Notification(CLIENT_CRASHED, serverSource,
           SequenceNumber.next(), System.currentTimeMillis(),
-          ManagementConstants.CLIENT_CRASHED_PREFIX + event.getMemberId());
-      serverLevelNotifEmitter.sendNotification(notification);
-      memberLevelNotifEmitter.sendNotification(notification);
+          CLIENT_CRASHED_PREFIX + event.getMemberId());
+
+      serverLevelNotificationEmitter.sendNotification(notification);
+      memberLevelNotificationEmitter.sendNotification(notification);
     }
-
   }
-
-  private boolean isServiceInitialised(String method) {
-    if (!serviceInitialised) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Management Service is not initialised hence returning from {}", method);
-      }
-      return false;
-    }
-
-    return true;
-  }
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index a82a7fb..12edec3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -128,7 +128,7 @@
           break;
         case CACHE_REMOVE:
           InternalCache removedCache = (InternalCache) resource;
-          adapter.handleCacheRemoval(removedCache);
+          adapter.handleCacheRemoval();
           break;
         case REGION_CREATE:
           Region createdRegion = (Region) resource;
@@ -152,7 +152,7 @@
           break;
         case GATEWAYRECEIVER_DESTROY:
           GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
-          adapter.handleGatewayReceiverDestroy(destroyedRecv);
+          adapter.handleGatewayReceiverDestroy();
           break;
         case GATEWAYRECEIVER_START:
           GatewayReceiver startedRecv = (GatewayReceiver) resource;
@@ -160,7 +160,7 @@
           break;
         case GATEWAYRECEIVER_STOP:
           GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
-          adapter.handleGatewayReceiverStop(stoppededRecv);
+          adapter.handleGatewayReceiverStop();
           break;
         case GATEWAYSENDER_CREATE:
           GatewaySender sender = (GatewaySender) resource;
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
index b776bed..c233935 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
@@ -17,6 +17,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -25,42 +26,47 @@
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.alerting.internal.spi.AlertLevel;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.HasCachePerfStats;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.logging.LoggingExecutors;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.test.junit.categories.JMXTest;
 
+@Category(JMXTest.class)
 public class FederatingManagerTest {
 
+  private InternalCache cache;
+  private InternalCacheForClientAccess cacheForClientAccess;
   private MBeanJMXAdapter jmxAdapter;
   private ManagementResourceRepo repo;
-  private InternalDistributedSystem system;
   private SystemManagementService service;
-  private InternalCache cache;
   private StatisticsFactory statisticsFactory;
   private StatisticsClock statisticsClock;
-  private InternalCacheForClientAccess cacheForClientAccess;
+  private InternalDistributedSystem system;
 
   @Before
   public void setUp() throws Exception {
+    cache = mock(InternalCache.class);
+    cacheForClientAccess = mock(InternalCacheForClientAccess.class);
     jmxAdapter = mock(MBeanJMXAdapter.class);
     repo = mock(ManagementResourceRepo.class);
-    system = mock(InternalDistributedSystem.class);
     service = mock(SystemManagementService.class);
-    cache = mock(InternalCache.class);
-    statisticsFactory = mock(StatisticsFactory.class);
     statisticsClock = mock(StatisticsClock.class);
-    cacheForClientAccess = mock(InternalCacheForClientAccess.class);
+    statisticsFactory = mock(StatisticsFactory.class);
+    system = mock(InternalDistributedSystem.class);
+
     DistributedSystemMXBean distributedSystemMXBean = mock(DistributedSystemMXBean.class);
 
     when(cache.getCacheForProcessingClientRequests())
@@ -77,66 +83,182 @@
 
   @Test
   public void addMemberArtifactsCreatesMonitoringRegion() throws Exception {
-    InternalDistributedMember member = member(1, 20);
-    FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
-        cache, statisticsFactory, statisticsClock);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
     federatingManager.startManager();
 
-    federatingManager.addMemberArtifacts(member);
+    federatingManager.addMemberArtifacts(member(1, 20));
 
-    verify(cacheForClientAccess).createInternalRegion(eq("_monitoringRegion_null<v1>20"), any(),
-        any());
+    verify(cacheForClientAccess)
+        .createInternalRegion(eq("_monitoringRegion_null<v1>20"), any(), any());
   }
 
   @Test
   public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() throws Exception {
-    InternalDistributedMember member = member(2, 40);
-    FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
-        cache, statisticsFactory, statisticsClock);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
     federatingManager.startManager();
 
-    federatingManager.addMemberArtifacts(member);
+    federatingManager.addMemberArtifacts(member(2, 40));
 
     ArgumentCaptor<InternalRegionArguments> captor =
         ArgumentCaptor.forClass(InternalRegionArguments.class);
-    verify(cacheForClientAccess).createInternalRegion(eq("_monitoringRegion_null<v2>40"), any(),
-        captor.capture());
-
-    InternalRegionArguments internalRegionArguments = captor.getValue();
-    HasCachePerfStats hasCachePerfStats = internalRegionArguments.getCachePerfStatsHolder();
-    assertThat(hasCachePerfStats.hasOwnStats()).isTrue();
+    verify(cacheForClientAccess)
+        .createInternalRegion(eq("_monitoringRegion_null<v2>40"), any(), captor.capture());
+    boolean hasOwnStats = captor.getValue().getCachePerfStatsHolder().hasOwnStats();
+    assertThat(hasOwnStats)
+        .isTrue();
   }
 
   @Test
   public void addMemberArtifactsCreatesNotificationRegion() throws Exception {
-    InternalDistributedMember member = member(3, 60);
-    FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
-        cache, statisticsFactory, statisticsClock);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
     federatingManager.startManager();
 
-    federatingManager.addMemberArtifacts(member);
+    federatingManager.addMemberArtifacts(member(3, 60));
 
-    verify(cacheForClientAccess).createInternalRegion(eq("_notificationRegion_null<v3>60"), any(),
-        any());
+    verify(cacheForClientAccess)
+        .createInternalRegion(eq("_notificationRegion_null<v3>60"), any(), any());
   }
 
   @Test
   public void addMemberArtifactsCreatesNotificationRegionWithHasOwnStats() throws Exception {
-    InternalDistributedMember member = member(4, 80);
-    FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
-        cache, statisticsFactory, statisticsClock);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
     federatingManager.startManager();
 
-    federatingManager.addMemberArtifacts(member);
+    federatingManager.addMemberArtifacts(member(4, 80));
 
     ArgumentCaptor<InternalRegionArguments> captor =
         ArgumentCaptor.forClass(InternalRegionArguments.class);
-    verify(cacheForClientAccess).createInternalRegion(eq("_notificationRegion_null<v4>80"), any(),
-        captor.capture());
+    verify(cacheForClientAccess)
+        .createInternalRegion(eq("_notificationRegion_null<v4>80"), any(), captor.capture());
 
     InternalRegionArguments internalRegionArguments = captor.getValue();
-    HasCachePerfStats hasCachePerfStats = internalRegionArguments.getCachePerfStatsHolder();
-    assertThat(hasCachePerfStats.hasOwnStats()).isTrue();
+    assertThat(internalRegionArguments.getCachePerfStatsHolder().hasOwnStats())
+        .isTrue();
+  }
+
+  @Test
+  public void removeMemberArtifactsLocallyDestroysMonitoringRegion() {
+    InternalDistributedMember member = member();
+    Region monitoringRegion = mock(Region.class);
+    when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+        .thenReturn(monitoringRegion);
+    when(repo.getEntryFromNotifRegionMap(eq(member)))
+        .thenReturn(mock(Region.class));
+    when(system.getDistributedMember())
+        .thenReturn(member);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
+
+    federatingManager.removeMemberArtifacts(member, false);
+
+    verify(monitoringRegion)
+        .localDestroyRegion();
+  }
+
+  @Test
+  public void removeMemberArtifactsLocallyDestroysNotificationRegion() {
+    InternalDistributedMember member = member();
+    Region notificationRegion = mock(Region.class);
+    when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+        .thenReturn(mock(Region.class));
+    when(repo.getEntryFromNotifRegionMap(eq(member)))
+        .thenReturn(notificationRegion);
+    when(system.getDistributedMember())
+        .thenReturn(member);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
+
+    federatingManager.removeMemberArtifacts(member, false);
+
+    verify(notificationRegion)
+        .localDestroyRegion();
+  }
+
+  @Test
+  public void removeMemberArtifactsDoesNotThrowIfMonitoringRegionIsAlreadyDestroyed() {
+    InternalDistributedMember member = member();
+    Region monitoringRegion = mock(Region.class);
+    doThrow(new RegionDestroyedException("test", "monitoringRegion"))
+        .when(monitoringRegion).localDestroyRegion();
+    when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+        .thenReturn(monitoringRegion);
+    when(repo.getEntryFromNotifRegionMap(eq(member)))
+        .thenReturn(mock(Region.class));
+    when(system.getDistributedMember())
+        .thenReturn(member);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
+
+    federatingManager.removeMemberArtifacts(member, false);
+
+    verify(monitoringRegion)
+        .localDestroyRegion();
+  }
+
+  @Test
+  public void removeMemberArtifactsDoesNotThrowIfNotificationRegionIsAlreadyDestroyed() {
+    InternalDistributedMember member = member();
+    Region notificationRegion = mock(Region.class);
+    doThrow(new RegionDestroyedException("test", "notificationRegion"))
+        .when(notificationRegion).localDestroyRegion();
+    when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+        .thenReturn(mock(Region.class));
+    when(repo.getEntryFromNotifRegionMap(eq(member)))
+        .thenReturn(notificationRegion);
+    when(system.getDistributedMember())
+        .thenReturn(member);
+    FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+        statisticsFactory, statisticsClock,
+        new MBeanProxyFactory(jmxAdapter, service),
+        new MemberMessenger(jmxAdapter, system),
+        LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+            Runtime.getRuntime().availableProcessors()));
+
+    federatingManager.removeMemberArtifacts(member, false);
+
+    verify(notificationRegion)
+        .localDestroyRegion();
+  }
+
+  @Test
+  public void removeMemberArtifactsDoesNotThrowIfMBeanProxyFactoryThrowsRegionDestroyedException() {
+
+  }
+
+  private InternalDistributedMember member() {
+    return member(1, 1);
   }
 
   private InternalDistributedMember member(int viewId, int port) {