GEODE-7330: Prevent RegionDestroyedException in FederatingManager (#4193)
* GEODE-7330: Prevent RegionDestroyedException in FederatingManager
Cleanup management classes:
* Reduce method and field visibility as much as possible
* Remove unnecessary uses of this
* Reorder fields and methods based on visibility and other modifiers
* Use @VisibleForTesting annotation
* Fixup formatting and variable names
Make all FederatingManager fields final:
* Remove field setters for tests
* Introduce FederatingManagerFactory
* Add FederatingManagerFactory system property for tests to
SystemManagementService
* Inject all FederatingManager fields via constructor
* Use Geode APIs in MBeanFederationErrorPathDUnitTest
Rename MBeanFederationErrorHandlingDistributedTest:
* Rename MBeanFederationErrorPathDUnitTest as
MBeanFederationErrorHandlingDistributedTest
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
new file mode 100644
index 0000000..06be411
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.management.internal.SystemManagementService.FEDERATING_MANAGER_FACTORY_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+
+import javax.management.ObjectName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@Category(JMXTest.class)
+public class MBeanFederationErrorHandlingDistributedTest implements Serializable {
+
+ private static final String REGION_NAME = "test-region-1";
+
+ private static LocatorLauncher locatorLauncher;
+ private static ServerLauncher serverLauncher;
+ private static MBeanProxyFactory proxyFactory;
+
+ private ObjectName regionMXBeanName;
+ private String locatorName;
+ private String serverName;
+ private int locatorPort;
+ private VM locatorVM;
+ private VM serverVM;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ @Before
+ public void setUp() throws Exception {
+ locatorName = "locator";
+ serverName = "server";
+ regionMXBeanName =
+ new ObjectName(String.format("GemFire:service=Region,name=\"%s\",type=Member,member=%s",
+ SEPARATOR + REGION_NAME, serverName));
+
+ locatorVM = getController();
+ serverVM = getVM(0);
+
+ locatorPort = locatorVM.invoke(this::startLocator);
+
+ serverVM.invoke(this::startServer);
+ }
+
+ @After
+ public void tearDown() {
+ locatorVM.invoke(() -> {
+ if (locatorLauncher != null) {
+ locatorLauncher.stop();
+ locatorLauncher = null;
+ proxyFactory = null;
+ }
+ });
+
+ serverVM.invoke(() -> {
+ if (serverLauncher != null) {
+ serverLauncher.stop();
+ serverLauncher = null;
+ }
+ });
+ }
+
+ @Test
+ public void destroyMBeanBeforeFederationCompletes() {
+ locatorVM.invoke(() -> doAnswer((Answer<Void>) invocation -> {
+ serverVM.invoke(() -> {
+ Region region = serverLauncher.getCache().getRegion(REGION_NAME);
+ region.destroyRegion();
+ });
+
+ Region<String, Object> monitoringRegion = invocation.getArgument(2);
+ monitoringRegion.destroy(regionMXBeanName.toString());
+
+ assertThat(monitoringRegion.get(regionMXBeanName.toString())).isNull();
+
+ try {
+ invocation.callRealMethod();
+ } catch (Exception e) {
+ if (!locatorLauncher.getCache().isClosed()) {
+ errorCollector.addError(e);
+ }
+ }
+
+ return null;
+ })
+ .when(proxyFactory).createProxy(any(), eq(regionMXBeanName), any(), any()));
+
+ serverVM.invoke(() -> {
+ serverLauncher.getCache().createRegionFactory(REPLICATE).create(REGION_NAME);
+ });
+
+ locatorVM.invoke(() -> {
+ await().untilAsserted(
+ () -> verify(proxyFactory).createProxy(any(), eq(regionMXBeanName), any(), any()));
+ });
+ }
+
+ private int startLocator() throws IOException {
+ System.setProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+ FederatingManagerFactoryWithSpy.class.getName());
+
+ locatorLauncher = new LocatorLauncher.Builder()
+ .setMemberName(locatorName)
+ .setPort(0)
+ .setWorkingDirectory(temporaryFolder.newFolder(locatorName).getAbsolutePath())
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(JMX_MANAGER_PORT, "0")
+ .build();
+
+ locatorLauncher.start();
+
+ Cache cache = locatorLauncher.getCache();
+
+ SystemManagementService service =
+ (SystemManagementService) ManagementService.getManagementService(cache);
+ service.startManager();
+ FederatingManager federatingManager = service.getFederatingManager();
+ proxyFactory = federatingManager.getProxyFactory();
+
+ return locatorLauncher.getPort();
+ }
+
+ private void startServer() throws IOException {
+ serverLauncher = new ServerLauncher.Builder()
+ .setDisableDefaultServer(true)
+ .setMemberName(serverName)
+ .setWorkingDirectory(temporaryFolder.newFolder(serverName).getAbsolutePath())
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(LOCATORS, "localHost[" + locatorPort + "]")
+ .build();
+
+ serverLauncher.start();
+ }
+
+ private static class FederatingManagerFactoryWithSpy implements FederatingManagerFactory {
+
+ public FederatingManagerFactoryWithSpy() {
+ // must be public for instantiation by reflection
+ }
+
+ @Override
+ public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+ ExecutorService executorService) {
+ return new FederatingManager(repo, system, service, cache, statisticsFactory,
+ statisticsClock, spy(proxyFactory), messenger, executorService);
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
deleted file mode 100644
index f3f17b0..0000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-
-import java.rmi.RemoteException;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.ManagementService;
-import org.apache.geode.test.dunit.internal.InternalBlackboard;
-import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
-import org.apache.geode.test.junit.categories.JMXTest;
-import org.apache.geode.test.junit.rules.LocatorStarterRule;
-
-@Category({JMXTest.class})
-public class MBeanFederationErrorPathDUnitTest {
- private static final int SERVER_1_VM_INDEX = 1;
- private static final String REGION_NAME = "test-region-1";
-
- public MemberVM server1, server2, server3;
-
- @Rule
- public LocatorStarterRule locator1 = new LocatorStarterRule();
-
- @Rule
- public ClusterStartupRule lsRule = new ClusterStartupRule();
-
-
- private InternalBlackboard bb;
-
- @Before
- public void before() throws Exception {
- locator1.withJMXManager().startLocator();
-
- bb = InternalBlackboardImpl.getInstance();
- }
-
- @Test
- public void destroyMBeanBeforeFederationCompletes()
- throws MalformedObjectNameException, RemoteException {
- String bbKey = "sync1";
-
- String beanName = "GemFire:service=Region,name=\"/test-region-1\",type=Member,member=server-1";
- ObjectName objectName = new ObjectName(beanName);
-
- InternalCache cache = locator1.getCache();
- SystemManagementService service =
- (SystemManagementService) ManagementService.getManagementService(cache);
- FederatingManager federatingManager = service.getFederatingManager();
- MBeanProxyFactory mBeanProxyFactory = federatingManager.getProxyFactory();
- MBeanProxyFactory spy = spy(mBeanProxyFactory);
- service.getFederatingManager().setProxyFactory(spy);
-
- Answer answer1 = new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- server1.invoke(() -> {
- InternalCache serverCache = ClusterStartupRule.getCache();
- Region region = serverCache.getRegionByPath("/" + REGION_NAME);
- region.destroyRegion();
- });
-
- Region<String, Object> monitoringRegion = invocation.getArgument(2);
- monitoringRegion.destroy(objectName.toString());
-
- assertThat((monitoringRegion).get(objectName.toString())).isNull();
-
- try {
- invocation.callRealMethod();
- } catch (Exception e) {
- bb.setMailbox(bbKey, e);
- return null;
- }
- bb.setMailbox(bbKey, "this is fine");
- return null;
- }
- };
-
- doAnswer(answer1).when(spy).createProxy(any(), eq(objectName), any(), any());
-
- server1 = lsRule.startServerVM(SERVER_1_VM_INDEX, locator1.getPort());
-
- server1.invoke(() -> {
- InternalCache cache1 = ClusterStartupRule.getCache();
- cache1.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME);
- });
-
- await().until(() -> bb.getMailbox(bbKey) != null);
- Object e = bb.getMailbox("sync1");
-
- assertThat(e).isNotInstanceOf(NullPointerException.class);
- assertThat((String) e).contains("this is fine");
- }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java
deleted file mode 100644
index f5cbaad..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/FederatingManagerIntegrationTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.management;
-
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.management.internal.FederatingManager;
-import org.apache.geode.management.internal.MemberMessenger;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.junit.categories.JMXTest;
-import org.apache.geode.test.junit.rules.ServerStarterRule;
-
-@Category({JMXTest.class})
-public class FederatingManagerIntegrationTest {
-
- @Rule
- public ServerStarterRule serverRule = new ServerStarterRule();
-
- @Test
- public void testFederatingManagerConcurrency() throws Exception {
- serverRule.startServer();
- SystemManagementService service =
- (SystemManagementService) ManagementService
- .getExistingManagementService(serverRule.getCache());
- service.createManager();
- FederatingManager manager = service.getFederatingManager();
-
- MemberMessenger messenger = mock(MemberMessenger.class);
- manager.setMessenger(messenger);
-
- manager.startManager();
-
- InternalDistributedMember mockMember = mock(InternalDistributedMember.class);
- when(mockMember.getInetAddress()).thenReturn(InetAddress.getLocalHost());
- when(mockMember.getId()).thenReturn("member-1");
-
- for (int i = 0; i < 100; i++) {
- manager.addMember(mockMember);
- }
-
- await()
- .until(() -> serverRule.getCache().getAllRegions().size() > 1);
- assertThat(manager.getAndResetLatestException()).isNull();
- }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
new file mode 100644
index 0000000..355e3ec
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
+import static org.apache.geode.management.internal.SystemManagementService.FEDERATING_MANAGER_FACTORY_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.junit.categories.JMXTest;
+
+@Category(JMXTest.class)
+public class FederatingManagerIntegrationTest {
+
+ private InternalCache cache;
+ private FederatingManager federatingManager;
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+ @Before
+ public void setUp() {
+ System.setProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+ FederatingManagerFactoryWithMockMessenger.class.getName());
+
+ cache = (InternalCache) new CacheFactory()
+ .set(LOCATORS, "")
+ .create();
+
+ SystemManagementService managementService =
+ (SystemManagementService) ManagementService.getExistingManagementService(cache);
+ managementService.createManager();
+ federatingManager = managementService.getFederatingManager();
+ federatingManager.startManager();
+ }
+
+ @After
+ public void tearDown() {
+ cache.close();
+ }
+
+ @Test
+ public void testFederatingManagerConcurrency() throws UnknownHostException {
+ InternalDistributedMember member = member();
+
+ for (int i = 1; i <= 100; i++) {
+ federatingManager.addMember(member);
+ }
+
+ await().until(() -> !cache.getAllRegions().isEmpty());
+
+ assertThat(federatingManager.getAndResetLatestException()).isNull();
+ }
+
+ private InternalDistributedMember member() throws UnknownHostException {
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(member.getInetAddress()).thenReturn(getLocalHost());
+ when(member.getId()).thenReturn("member-1");
+ return member;
+ }
+
+ private static class FederatingManagerFactoryWithMockMessenger
+ implements FederatingManagerFactory {
+
+ public FederatingManagerFactoryWithMockMessenger() {
+ // must be public for instantiation by reflection
+ }
+
+ @Override
+ public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+ ExecutorService executorService) {
+ return new FederatingManager(repo, system, service, cache, statisticsFactory,
+ statisticsClock, proxyFactory, mock(MemberMessenger.class), executorService);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java b/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
index 66abcf8..7a748b1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/RegionDestroyedException.java
@@ -18,15 +18,15 @@
* Indicates that the region has been destroyed. Further operations on the region object are not
* allowed.
*
- *
* @since GemFire 2.0
*/
public class RegionDestroyedException extends CacheRuntimeException {
private static final long serialVersionUID = 319804842308010754L;
- private String regionFullPath;
+
+ private final String regionFullPath;
/**
- * Constructs a <code>RegionDestroyedException</code> with a message.
+ * Constructs a {@code RegionDestroyedException} with a message.
*
* @param msg the String message
*/
@@ -36,7 +36,7 @@
}
/**
- * Constructs a <code>RegionDestroyedException</code> with a message and a cause.
+ * Constructs a {@code RegionDestroyedException} with a message and a cause.
*
* @param s the String message
* @param ex the Throwable cause
@@ -47,6 +47,6 @@
}
public String getRegionFullPath() {
- return this.regionFullPath;
+ return regionFullPath;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index ca735c0..c5168a2 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -40,6 +40,7 @@
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
@@ -51,7 +52,6 @@
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.management.ManagementException;
@@ -65,7 +65,10 @@
* @since GemFire 7.0
*/
public class FederatingManager extends Manager {
- public static final Logger logger = LogService.getLogger();
+ private static final Logger logger = LogService.getLogger();
+
+ private final SystemManagementService service;
+ private final AtomicReference<Exception> latestException = new AtomicReference<>();
/**
* This Executor uses a pool of thread to execute the member addition /removal tasks, This will
@@ -73,31 +76,19 @@
* unbounded in practical situation as number of members will be a finite set at any given point
* of time
*/
- private ExecutorService pooledMembershipExecutor;
+ private final ExecutorService executorService;
+ private final MBeanProxyFactory proxyFactory;
+ private final MemberMessenger messenger;
- /**
- * Proxy factory is used to create , remove proxies
- */
- private MBeanProxyFactory proxyFactory;
-
- private MemberMessenger messenger;
-
- private final SystemManagementService service;
-
- private final AtomicReference<Exception> latestException = new AtomicReference<>(null);
-
- FederatingManager(MBeanJMXAdapter jmxAdapter, ManagementResourceRepo repo,
- InternalDistributedSystem system, SystemManagementService service, InternalCache cache,
- StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
+ FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+ ExecutorService executorService) {
super(repo, system, cache, statisticsFactory, statisticsClock);
this.service = service;
- proxyFactory = new MBeanProxyFactory(jmxAdapter, service);
- messenger = new MemberMessenger(jmxAdapter, system);
- }
-
- @VisibleForTesting
- void setProxyFactory(MBeanProxyFactory newProxyFactory) {
- proxyFactory = newProxyFactory;
+ this.proxyFactory = proxyFactory;
+ this.messenger = messenger;
+ this.executorService = executorService;
}
/**
@@ -111,9 +102,6 @@
logger.debug("Starting the Federating Manager.... ");
}
- pooledMembershipExecutor = LoggingExecutors.newFixedThreadPool("FederatingManager", true,
- Runtime.getRuntime().availableProcessors());
-
running = true;
startManagingActivity();
messenger.broadcastManagerInfo();
@@ -136,46 +124,13 @@
stopManagingActivity();
}
- /**
- * This method will be invoked whenever a member stops being a managing node. The
- * {@code ManagementException} has to be handled by the caller.
- */
- private void stopManagingActivity() {
- try {
- pooledMembershipExecutor.shutdownNow();
-
- for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
- removeMemberArtifacts(distributedMember, false);
- }
- } catch (Exception e) {
- throw new ManagementException(e);
- }
- }
-
@Override
public boolean isRunning() {
return running;
}
- /**
- * This method will be invoked from MembershipListener which is registered when the member becomes
- * a Management node.
- *
- * <p>
- * This method will delegate task to another thread and exit, so that it wont block the membership
- * listener
- */
- @VisibleForTesting
- public void addMember(InternalDistributedMember member) {
- GIITask giiTask = new GIITask(member);
- executeTask(() -> {
- try {
- giiTask.call();
- } catch (RuntimeException e) {
- logger.warn("Error federating new member {}", member.getId(), e);
- latestException.set(e);
- }
- });
+ public MemberMessenger getMessenger() {
+ return messenger;
}
/**
@@ -190,38 +145,6 @@
executeTask(new RemoveMemberTask(member, crashed));
}
- private void executeTask(Runnable task) {
- try {
- pooledMembershipExecutor.execute(task);
- } catch (RejectedExecutionException ignored) {
- // Ignore, we are getting shutdown
- }
- }
-
- private void removeMemberArtifacts(DistributedMember member, boolean crashed) {
- Region<String, Object> proxyRegion = repo.getEntryFromMonitoringRegionMap(member);
- Region<NotificationKey, Notification> notificationRegion =
- repo.getEntryFromNotifRegionMap(member);
-
- if (proxyRegion == null && notificationRegion == null) {
- return;
- }
-
- repo.romoveEntryFromMonitoringRegionMap(member);
- repo.removeEntryFromNotifRegionMap(member);
-
- // If cache is closed all the regions would have been destroyed implicitly
- if (!cache.isClosed()) {
- proxyFactory.removeAllProxies(member, proxyRegion);
- proxyRegion.localDestroyRegion();
- notificationRegion.localDestroyRegion();
- }
-
- if (!system.getDistributedMember().equals(member)) {
- service.memberDeparted((InternalDistributedMember) member, crashed);
- }
- }
-
/**
* This method will be invoked from MembershipListener which is registered when the member becomes
* a Management node.
@@ -236,6 +159,66 @@
}
/**
+ * This will return the last updated time of the proxyMBean.
+ *
+ * @param objectName {@link ObjectName} of the MBean
+ *
+ * @return last updated time of the proxy
+ */
+ long getLastUpdateTime(ObjectName objectName) {
+ return proxyFactory.getLastUpdateTime(objectName);
+ }
+
+ /**
+ * Find a particular proxy instance for a {@link ObjectName}, {@link DistributedMember} and
+ * interface class If the proxy interface does not implement the given interface class a
+ * {@link ClassCastException} will be thrown
+ *
+ * @param objectName {@link ObjectName} of the MBean
+ * @param interfaceClass interface class implemented by proxy
+ *
+ * @return an instance of proxy exposing the given interface
+ */
+ <T> T findProxy(ObjectName objectName, Class<T> interfaceClass) {
+ return proxyFactory.findProxy(objectName, interfaceClass);
+ }
+
+ /**
+ * Find a set of proxies given a {@link DistributedMember}.
+ *
+ * @param member {@link DistributedMember}
+ *
+ * @return a set of {@link ObjectName}
+ */
+ Set<ObjectName> findAllProxies(DistributedMember member) {
+ return proxyFactory.findAllProxies(member);
+ }
+
+ /**
+ * This method will be invoked whenever a member stops being a managing node. The
+ * {@code ManagementException} has to be handled by the caller.
+ */
+ private void stopManagingActivity() {
+ try {
+ executorService.shutdownNow();
+
+ for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
+ removeMemberArtifacts(distributedMember, false);
+ }
+ } catch (Exception e) {
+ throw new ManagementException(e);
+ }
+ }
+
+ private void executeTask(Runnable task) {
+ try {
+ executorService.execute(task);
+ } catch (RejectedExecutionException ignored) {
+ // Ignore, we are getting shutdown
+ }
+ }
+
+ /**
* This method will be invoked when a node transitions from managed node to managing node This
* method will block for all GIIs to be completed But each GII is given a specific time frame.
* After that the task will be marked as cancelled.
@@ -255,7 +238,7 @@
logger.debug("Management Resource creation started : ");
}
List<Future<InternalDistributedMember>> futureTaskList =
- pooledMembershipExecutor.invokeAll(giiTaskList);
+ executorService.invokeAll(giiTaskList);
for (Future<InternalDistributedMember> futureTask : futureTaskList) {
try {
@@ -301,60 +284,70 @@
}
/**
- * For internal Use only
+ * This method will be invoked from MembershipListener which is registered when the member becomes
+ * a Management node.
+ *
+ * <p>
+ * This method will delegate task to another thread and exit, so that it wont block the membership
+ * listener
*/
@VisibleForTesting
+ void addMember(InternalDistributedMember member) {
+ GIITask giiTask = new GIITask(member);
+ executeTask(() -> {
+ try {
+ giiTask.call();
+ } catch (RuntimeException e) {
+ logger.warn("Error federating new member {}", member.getId(), e);
+ latestException.set(e);
+ }
+ });
+ }
+
+ @VisibleForTesting
+ void removeMemberArtifacts(DistributedMember member, boolean crashed) {
+ Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
+ Region<NotificationKey, Notification> notificationRegion =
+ repo.getEntryFromNotifRegionMap(member);
+
+ if (monitoringRegion == null && notificationRegion == null) {
+ return;
+ }
+
+ repo.romoveEntryFromMonitoringRegionMap(member);
+ repo.removeEntryFromNotifRegionMap(member);
+
+ // If cache is closed all the regions would have been destroyed implicitly
+ if (!cache.isClosed()) {
+ try {
+ proxyFactory.removeAllProxies(member, monitoringRegion);
+ } catch (RegionDestroyedException ignore) {
+ // ignored
+ }
+ try {
+ monitoringRegion.localDestroyRegion();
+ } catch (RegionDestroyedException ignore) {
+ // ignored
+ }
+ try {
+ notificationRegion.localDestroyRegion();
+ } catch (RegionDestroyedException ignore) {
+ // ignored
+ }
+ }
+
+ if (!system.getDistributedMember().equals(member)) {
+ service.memberDeparted((InternalDistributedMember) member, crashed);
+ }
+ }
+
+ @VisibleForTesting
public MBeanProxyFactory getProxyFactory() {
return proxyFactory;
}
- /**
- * This will return the last updated time of the proxyMBean.
- *
- * @param objectName {@link ObjectName} of the MBean
- *
- * @return last updated time of the proxy
- */
- long getLastUpdateTime(ObjectName objectName) {
- return proxyFactory.getLastUpdateTime(objectName);
- }
-
- /**
- * Find a particular proxy instance for a {@link ObjectName}, {@link DistributedMember} and
- * interface class If the proxy interface does not implement the given interface class a
- * {@link ClassCastException} will be thrown
- *
- * @param objectName {@link ObjectName} of the MBean
- * @param interfaceClass interface class implemented by proxy
- *
- * @return an instance of proxy exposing the given interface
- */
- <T> T findProxy(ObjectName objectName, Class<T> interfaceClass) {
- return proxyFactory.findProxy(objectName, interfaceClass);
- }
-
- /**
- * Find a set of proxies given a {@link DistributedMember}.
- *
- * @param member {@link DistributedMember}
- *
- * @return a set of {@link ObjectName}
- */
- Set<ObjectName> findAllProxies(DistributedMember member) {
- return proxyFactory.findAllProxies(member);
- }
-
- public MemberMessenger getMessenger() {
- return messenger;
- }
-
@VisibleForTesting
- public void setMessenger(MemberMessenger messenger) {
- this.messenger = messenger;
- }
-
- @VisibleForTesting
- public synchronized Exception getAndResetLatestException() {
+ synchronized Exception getAndResetLatestException() {
return latestException.getAndSet(null);
}
@@ -500,7 +493,7 @@
// Before completing task intimate all listening ProxyListener which might send
// notifications.
- service.memberJoined((InternalDistributedMember) member);
+ service.memberJoined(member);
// Send manager info to the added member
messenger.sendManagerInfo(member);
@@ -538,7 +531,7 @@
private final InternalDistributedMember member;
- GIITask(InternalDistributedMember member) {
+ private GIITask(InternalDistributedMember member) {
this.member = member;
}
@@ -554,7 +547,7 @@
private final DistributedMember member;
private final boolean crashed;
- RemoveMemberTask(DistributedMember member, boolean crashed) {
+ private RemoveMemberTask(DistributedMember member, boolean crashed) {
this.member = member;
this.crashed = crashed;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
new file mode 100644
index 0000000..ed2e62b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.management.internal;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.statistics.StatisticsClock;
+
+@FunctionalInterface
+interface FederatingManagerFactory {
+
+ FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+ ExecutorService executorService);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
index 5962d29..7a48465 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
@@ -15,6 +15,7 @@
package org.apache.geode.management.internal;
import org.apache.geode.StatisticsFactory;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
@@ -34,12 +35,12 @@
/**
* depicts whether this node is a Managing node or not
*/
- protected volatile boolean running = false;
+ protected volatile boolean running;
/**
* depicts whether this node is a Managing node or not
*/
- protected volatile boolean stopCacheOps = false;
+ protected volatile boolean stopCacheOps;
/**
* This is a single window to manipulate region resources for management
@@ -70,9 +71,7 @@
public abstract void stopManager();
- /**
- * For internal use only
- */
+ @VisibleForTesting
public ManagementResourceRepo getManagementResourceRepo() {
return repo;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 79d99d4..9ba9be1 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -14,11 +14,17 @@
*/
package org.apache.geode.management.internal;
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toSet;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutorService;
import javax.management.Notification;
import javax.management.ObjectName;
@@ -27,6 +33,8 @@
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -36,6 +44,7 @@
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.management.AlreadyRunningException;
import org.apache.geode.management.AsyncEventQueueMXBean;
@@ -52,7 +61,6 @@
import org.apache.geode.management.ManagerMXBean;
import org.apache.geode.management.MemberMXBean;
import org.apache.geode.management.RegionMXBean;
-import org.apache.geode.management.internal.beans.ManagementAdapter;
import org.apache.geode.management.membership.MembershipEvent;
import org.apache.geode.management.membership.MembershipListener;
@@ -65,53 +73,33 @@
public class SystemManagementService extends BaseManagementService {
private static final Logger logger = LogService.getLogger();
+ @Immutable
+ @VisibleForTesting
+ static final String FEDERATING_MANAGER_FACTORY_PROPERTY = "FEDERATING_MANAGER_FACTORY";
+
/**
* The concrete implementation of DistributedSystem that provides internal-only functionality.
*/
private final InternalDistributedSystem system;
/**
- * core component for distribution
- */
- private LocalManager localManager;
-
- /**
* This is a notification hub to listen all the notifications emitted from all the MBeans in a
* peer cache./cache server
*/
private final NotificationHub notificationHub;
/**
- * whether the service is closed or not if cache is closed automatically this service will be
- * closed
- */
- private volatile boolean closed = false;
-
- /**
- * has the management service has started yet
- */
- private volatile boolean isStarted = false;
-
- /**
* Adapter to interact with platform MBean server
*/
private final MBeanJMXAdapter jmxAdapter;
private final InternalCacheForClientAccess cache;
- private FederatingManager federatingManager;
-
private final ManagementAgent agent;
private final ManagementResourceRepo repo;
/**
- * This membership listener will listen on membership events after the node has transformed into a
- * Managing node.
- */
- private ManagementMembershipListener listener;
-
- /**
* Proxy aggregator to create aggregate MBeans e.g. DistributedSystem and DistributedRegion
* GemFire comes with a default aggregator.
*/
@@ -122,19 +110,38 @@
private final StatisticsFactory statisticsFactory;
private final StatisticsClock statisticsClock;
+ private final FederatingManagerFactory federatingManagerFactory;
- public static BaseManagementService newSystemManagementService(
+ /**
+ * whether the service is closed or not if cache is closed automatically this service will be
+ * closed
+ */
+ private volatile boolean closed;
+
+ /**
+ * has the management service has started yet
+ */
+ private volatile boolean isStarted;
+
+ private LocalManager localManager;
+
+ private FederatingManager federatingManager;
+
+ /**
+ * This membership listener will listen on membership events after the node has transformed into a
+ * Managing node.
+ */
+ private ManagementMembershipListener listener;
+
+ static BaseManagementService newSystemManagementService(
InternalCacheForClientAccess cache) {
return new SystemManagementService(cache).init();
}
- protected SystemManagementService(InternalCacheForClientAccess cache) {
+ private SystemManagementService(InternalCacheForClientAccess cache) {
this.cache = cache;
- this.system = cache.getInternalDistributedSystem();
- // This is a safe check to ensure Management service does not start for a
- // system which is disconnected.
- // Most likely scenario when this will happen is when a cache is closed and we are at this
- // point.
+ system = cache.getInternalDistributedSystem();
+
if (!system.isConnected()) {
throw new DistributedSystemDisconnectedException(
"This connection to a distributed system has been disconnected.");
@@ -142,116 +149,49 @@
statisticsFactory = system.getStatisticsManager();
statisticsClock = cache.getStatisticsClock();
+ jmxAdapter = new MBeanJMXAdapter(system.getDistributedMember());
+ repo = new ManagementResourceRepo();
+ notificationHub = new NotificationHub(repo);
- this.jmxAdapter = new MBeanJMXAdapter(this.system.getDistributedMember());
- this.repo = new ManagementResourceRepo();
-
- this.notificationHub = new NotificationHub(repo);
if (system.getConfig().getJmxManager()) {
- this.agent = new ManagementAgent(system.getConfig(), cache);
+ agent = new ManagementAgent(system.getConfig(), cache);
} else {
- this.agent = null;
+ agent = null;
}
- ManagementFunction function = new ManagementFunction(notificationHub);
- FunctionService.registerFunction(function);
- this.proxyListeners = new CopyOnWriteArrayList<>();
- }
- /**
- * This method will initialize all the internal components for Management and Monitoring
- *
- * It will a)start an JMX connectorServer b) create a notification hub c)register the
- * ManagementFunction
- */
- private SystemManagementService init() {
- try {
- this.localManager =
- new LocalManager(repo, system, this, cache, statisticsFactory, statisticsClock);
- this.localManager.startManager();
- this.listener = new ManagementMembershipListener(this);
- system.getDistributionManager().addMembershipListener(listener);
- isStarted = true;
- return this;
- } catch (CancelException e) {
- // Rethrow all CancelExceptions (fix for defect 46339)
- throw e;
- } catch (Exception e) {
- // Wrap all other exceptions as ManagementExceptions
- logger.error(e.getMessage(), e);
- throw new ManagementException(e);
- }
- }
+ FunctionService.registerFunction(new ManagementFunction(notificationHub));
- /**
- * For internal Use only
- */
- public LocalManager getLocalManager() {
- return localManager;
- }
-
- public NotificationHub getNotificationHub() {
- return notificationHub;
- }
-
- public FederatingManager getFederatingManager() {
- return federatingManager;
- }
-
- public MBeanJMXAdapter getJMXAdapter() {
- return jmxAdapter;
- }
-
- public ManagementAgent getManagementAgent() {
- return agent;
- }
-
- public boolean isStartedAndOpen() {
- return isStarted && !closed && system.isConnected();
- }
-
- private void verifyManagementService() {
- if (!isStarted) {
- throw new ManagementException(
- "Management Service Not Started Yet");
- }
- if (!system.isConnected()) {
- throw new ManagementException(
- "Not Connected To Distributed System");
- }
- if (closed) {
- throw new ManagementException(
- "Management Service Is Closed");
- }
+ proxyListeners = new CopyOnWriteArrayList<>();
+ federatingManagerFactory = createFederatingManagerFactory();
}
@Override
public void close() {
synchronized (instances) {
if (closed) {
- // its a no op, hence not logging any exception
return;
}
+
if (logger.isDebugEnabled()) {
logger.debug("Closing Management Service");
}
if (listener != null && system.isConnected()) {
system.getDistributionManager().removeMembershipListener(listener);
}
- // Stop the Federating Manager first . It will ensure MBeans are not getting federated.
- // while un-registering
+ // Stop the Federating Manager first to avoid federating while un-registering
if (federatingManager != null && federatingManager.isRunning()) {
federatingManager.stopManager();
}
- this.notificationHub.cleanUpListeners();
+ notificationHub.cleanUpListeners();
jmxAdapter.cleanJMXResource();
if (localManager.isRunning()) {
localManager.stopManager();
}
- if (this.agent != null && this.agent.isRunning()) {
- this.agent.stopAgent();
+ if (agent != null && agent.isRunning()) {
+ agent.stopAgent();
}
- getInternalCache().getJmxManagerAdvisor().broadcastChange();
+ cache.getJmxManagerAdvisor().broadcastChange();
instances.remove(cache);
localManager = null;
closed = true;
@@ -262,31 +202,28 @@
public <T> void federate(ObjectName objectName, Class<T> interfaceClass,
boolean notificationEmitter) {
verifyManagementService();
- if (!objectName.getDomain().equalsIgnoreCase(ManagementConstants.OBJECTNAME__DEFAULTDOMAIN)) {
- throw new ManagementException(
- "Not A GemFire Domain MBean, can not Federate");
- }
+ if (!objectName.getDomain().equalsIgnoreCase(ManagementConstants.OBJECTNAME__DEFAULTDOMAIN)) {
+ throw new ManagementException("Not A GemFire Domain MBean, can not Federate");
+ }
if (!jmxAdapter.isRegistered(objectName)) {
- throw new ManagementException(
- "MBean Not Registered In GemFire Domain");
+ throw new ManagementException("MBean Not Registered In GemFire Domain");
}
if (notificationEmitter && !jmxAdapter.hasNotificationSupport(objectName)) {
- throw new ManagementException(
- "MBean Does Not Have Notification Support");
+ throw new ManagementException("MBean Does Not Have Notification Support");
}
// All validation Passed. Now create the federation Component
Object object = jmxAdapter.getMBeanObject(objectName);
- FederationComponent fedComp =
+ FederationComponent federationComponent =
new FederationComponent(object, objectName, interfaceClass, notificationEmitter);
- if (ManagementAdapter.refreshOnInit.contains(interfaceClass)) {
- fedComp.refreshObjectState(true);// Fixes 46387
+ if (asList(RegionMXBean.class, MemberMXBean.class).contains(interfaceClass)) {
+ federationComponent.refreshObjectState(true);
}
- localManager.markForFederation(objectName, fedComp);
+ localManager.markForFederation(objectName, federationComponent);
if (isManager()) {
- afterCreateProxy(objectName, interfaceClass, object, fedComp);
+ afterCreateProxy(objectName, interfaceClass, object, federationComponent);
}
}
@@ -302,8 +239,8 @@
}
if (federatingManager == null) {
return 0;
-
- } else if (!federatingManager.isRunning()) {
+ }
+ if (!federatingManager.isRunning()) {
return 0;
}
if (jmxAdapter.isLocalMBean(objectName)) {
@@ -327,20 +264,6 @@
return jmxAdapter.getLocalRegionMXBean(regionPath);
}
- public <T> T getMBeanProxy(ObjectName objectName, Class<T> interfaceClass) {
- if (!isStartedAndOpen()) {
- return null;
- }
- if (federatingManager == null) {
- return null;
-
- } else if (!federatingManager.isRunning()) {
- return null;
- }
-
- return federatingManager.findProxy(objectName, interfaceClass);
- }
-
@Override
public MemberMXBean getMemberMXBean() {
return jmxAdapter.getMemberMXBean();
@@ -353,22 +276,21 @@
}
if (system.getDistributedMember().equals(member)) {
return jmxAdapter.getLocalGemFireMBean().keySet();
- } else {
- if (federatingManager == null) {
- return Collections.emptySet();
-
- } else if (!federatingManager.isRunning()) {
- return Collections.emptySet();
- }
- return federatingManager.findAllProxies(member);
}
+ if (federatingManager == null) {
+ return Collections.emptySet();
+ }
+ if (!federatingManager.isRunning()) {
+ return Collections.emptySet();
+ }
+ return federatingManager.findAllProxies(member);
}
@Override
public Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member) {
- Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
- return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
- .collect(Collectors.toSet());
+ return queryMBeanNames(member).stream()
+ .filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
+ .collect(toSet());
}
@Override
@@ -377,24 +299,20 @@
return jmxAdapter.registerMBean(object, objectName, false);
}
- public ObjectName registerInternalMBean(Object object, ObjectName objectName) {
- verifyManagementService();
- return jmxAdapter.registerMBean(object, objectName, true);
- }
-
@Override
public void unregisterMBean(ObjectName objectName) {
if (!isStartedAndOpen()) {
return;
}
+
verifyManagementService();
if (isManager()) {
- FederationComponent removedObj = localManager.getFedComponents().get(objectName);
- if (removedObj != null) { // only for MBeans local to Manager , not
- // proxies
- afterRemoveProxy(objectName, removedObj.getInterfaceClass(), removedObj.getMBeanObject(),
- removedObj);
+ FederationComponent removed = localManager.getFedComponents().get(objectName);
+ if (removed != null) {
+ // only for MBeans local to Manager, not proxies
+ afterRemoveProxy(objectName, removed.getInterfaceClass(), removed.getMBeanObject(),
+ removed);
}
}
@@ -407,19 +325,16 @@
return isManagerCreated() && federatingManager.isRunning();
}
- public boolean isManagerCreated() {
- return isStartedAndOpen() && federatingManager != null;
- }
-
@Override
public void startManager() {
- if (!getInternalCache().getInternalDistributedSystem().getConfig().getJmxManager()) {
- // fix for 45900
+ if (!cache.getInternalDistributedSystem().getConfig().getJmxManager()) {
throw new ManagementException(
"Could not start the manager because the gemfire property \"jmx-manager\" is false.");
}
+
synchronized (instances) {
verifyManagementService();
+
if (federatingManager != null && federatingManager.isRunning()) {
throw new AlreadyRunningException(
"Manager is already running");
@@ -432,15 +347,16 @@
} else if (!federatingManager.isRunning()) {
needsToBeStarted = true;
}
+
if (needsToBeStarted) {
boolean started = false;
try {
system.handleResourceEvent(ResourceEvent.MANAGER_START, null);
federatingManager.startManager();
- if (this.agent != null) {
- this.agent.startAgent();
+ if (agent != null) {
+ agent.startAgent();
}
- getInternalCache().getJmxManagerAdvisor().broadcastChange();
+ cache.getJmxManagerAdvisor().broadcastChange();
started = true;
} catch (RuntimeException | Error e) {
logger.error("Jmx manager could not be started because {}", e.getMessage(), e);
@@ -457,27 +373,6 @@
}
}
- private InternalCache getInternalCache() {
- return this.cache;
- }
-
- /**
- * Creates a Manager instance in stopped state.
- */
- public boolean createManager() {
- synchronized (instances) {
- if (federatingManager != null) {
- return false;
- }
- system.handleResourceEvent(ResourceEvent.MANAGER_CREATE, null);
- // An initialised copy of federating manager
- federatingManager = new FederatingManager(jmxAdapter, repo, system, this, cache,
- statisticsFactory, statisticsClock);
- getInternalCache().getJmxManagerAdvisor().broadcastChange();
- return true;
- }
- }
-
/**
* It will stop the federating Manager and restart the Local cache operation
*/
@@ -488,9 +383,9 @@
if (federatingManager != null) {
federatingManager.stopManager();
system.handleResourceEvent(ResourceEvent.MANAGER_STOP, null);
- getInternalCache().getJmxManagerAdvisor().broadcastChange();
- if (this.agent != null && this.agent.isRunning()) {
- this.agent.stopAgent();
+ cache.getJmxManagerAdvisor().broadcastChange();
+ if (agent != null && agent.isRunning()) {
+ agent.stopAgent();
}
}
}
@@ -516,18 +411,6 @@
return jmxAdapter.getDistributedSystemMXBean();
}
- public void addProxyListener(ProxyListener listener) {
- this.proxyListeners.add(listener);
- }
-
- public void removeProxyListener(ProxyListener listener) {
- this.proxyListeners.remove(listener);
- }
-
- public List<ProxyListener> getProxyListeners() {
- return this.proxyListeners;
- }
-
@Override
public ManagerMXBean getManagerMXBean() {
return jmxAdapter.getManagerMXBean();
@@ -618,36 +501,102 @@
return jmxAdapter.getLocatorMXBean();
}
- public boolean afterCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+ @Override
+ public <T> T getMBeanInstance(ObjectName objectName, Class<T> interfaceClass) {
+ if (jmxAdapter.isLocalMBean(objectName)) {
+ return jmxAdapter.findMBeanByName(objectName, interfaceClass);
+ }
+ return getMBeanProxy(objectName, interfaceClass);
+ }
+
+ @Override
+ public void addMembershipListener(MembershipListener listener) {
+ universalListenerContainer.addMembershipListener(listener);
+ }
+
+ @Override
+ public void removeMembershipListener(MembershipListener listener) {
+ universalListenerContainer.removeMembershipListener(listener);
+ }
+
+ public LocalManager getLocalManager() {
+ return localManager;
+ }
+
+ public FederatingManager getFederatingManager() {
+ return federatingManager;
+ }
+
+ public MBeanJMXAdapter getJMXAdapter() {
+ return jmxAdapter;
+ }
+
+ public ManagementAgent getManagementAgent() {
+ return agent;
+ }
+
+ public <T> T getMBeanProxy(ObjectName objectName, Class<T> interfaceClass) {
+ if (!isStartedAndOpen()) {
+ return null;
+ }
+ if (federatingManager == null) {
+ return null;
+ }
+ if (!federatingManager.isRunning()) {
+ return null;
+ }
+ return federatingManager.findProxy(objectName, interfaceClass);
+ }
+
+ public ObjectName registerInternalMBean(Object object, ObjectName objectName) {
+ verifyManagementService();
+ return jmxAdapter.registerMBean(object, objectName, true);
+ }
+
+ public boolean isManagerCreated() {
+ return isStartedAndOpen() && federatingManager != null;
+ }
+
+ /**
+ * Creates a Manager instance in stopped state.
+ */
+ public boolean createManager() {
+ synchronized (instances) {
+ if (federatingManager != null) {
+ return false;
+ }
+ system.handleResourceEvent(ResourceEvent.MANAGER_CREATE, null);
+ // An initialised copy of federating manager
+ federatingManager = federatingManagerFactory.create(repo, system, this, cache,
+ statisticsFactory, statisticsClock, new MBeanProxyFactory(jmxAdapter, this),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
+ cache.getJmxManagerAdvisor().broadcastChange();
+ return true;
+ }
+ }
+
+ public void addProxyListener(ProxyListener listener) {
+ proxyListeners.add(listener);
+ }
+
+ public void removeProxyListener(ProxyListener listener) {
+ proxyListeners.remove(listener);
+ }
+
+ public void afterCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
FederationComponent newVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterCreateProxy(objectName, interfaceClass, proxyObject, newVal);
}
- return true;
}
- public boolean afterPseudoCreateProxy(ObjectName objectName, Class interfaceClass,
- Object proxyObject, FederationComponent newVal) {
- for (ProxyListener listener : proxyListeners) {
- listener.afterPseudoCreateProxy(objectName, interfaceClass, proxyObject, newVal);
- }
- return true;
- }
-
- public boolean afterRemoveProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+ public void afterRemoveProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
FederationComponent oldVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterRemoveProxy(objectName, interfaceClass, proxyObject, oldVal);
}
- return true;
- }
-
- public boolean afterUpdateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
- FederationComponent newVal, FederationComponent oldVal) {
- for (ProxyListener listener : proxyListeners) {
- listener.afterUpdateProxy(objectName, interfaceClass, proxyObject, newVal, oldVal);
- }
- return true;
}
public void handleNotification(Notification notification) {
@@ -656,52 +605,133 @@
}
}
- @Override
- public <T> T getMBeanInstance(ObjectName objectName, Class<T> interfaceClass) {
- if (jmxAdapter.isLocalMBean(objectName)) {
- return jmxAdapter.findMBeanByName(objectName, interfaceClass);
- } else {
- return this.getMBeanProxy(objectName, interfaceClass);
- }
- }
-
- public void logFine(String s) {
- if (logger.isDebugEnabled()) {
- logger.debug(s);
- }
- }
-
- public void memberJoined(InternalDistributedMember id) {
+ void memberJoined(InternalDistributedMember id) {
for (ProxyListener listener : proxyListeners) {
listener.memberJoined(system.getDistributionManager(), id);
}
}
- public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+ void memberDeparted(InternalDistributedMember id, boolean crashed) {
for (ProxyListener listener : proxyListeners) {
listener.memberDeparted(system.getDistributionManager(), id, crashed);
}
}
- public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
+ void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
String reason) {
for (ProxyListener listener : proxyListeners) {
listener.memberSuspect(system.getDistributionManager(), id, whoSuspected, reason);
}
}
- public void quorumLost(Set<InternalDistributedMember> failures,
- List<InternalDistributedMember> remaining) {
+ void afterPseudoCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+ FederationComponent newVal) {
for (ProxyListener listener : proxyListeners) {
- listener.quorumLost(system.getDistributionManager(), failures, remaining);
+ listener.afterPseudoCreateProxy(objectName, interfaceClass, proxyObject, newVal);
}
}
- public static class UniversalListenerContainer {
+ boolean isStartedAndOpen() {
+ return isStarted && !closed && system.isConnected();
+ }
- private List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
+ void afterUpdateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
+ FederationComponent newVal, FederationComponent oldVal) {
+ for (ProxyListener listener : proxyListeners) {
+ listener.afterUpdateProxy(objectName, interfaceClass, proxyObject, newVal, oldVal);
+ }
+ }
- public void memberJoined(InternalDistributedMember id) {
+ UniversalListenerContainer getUniversalListenerContainer() {
+ return universalListenerContainer;
+ }
+
+ private void verifyManagementService() {
+ if (!isStarted) {
+ throw new ManagementException(
+ "Management Service Not Started Yet");
+ }
+ if (!system.isConnected()) {
+ throw new ManagementException(
+ "Not Connected To Distributed System");
+ }
+ if (closed) {
+ throw new ManagementException(
+ "Management Service Is Closed");
+ }
+ }
+
+ /**
+ * This method will initialize all the internal components for Management and Monitoring
+ *
+ * It will: <br>
+ * a) start an JMX connectorServer <br>
+ * b) create a notification hub <br>
+ * c) register the ManagementFunction
+ */
+ private SystemManagementService init() {
+ try {
+ localManager =
+ new LocalManager(repo, system, this, cache, statisticsFactory, statisticsClock);
+ listener = new ManagementMembershipListener(this);
+
+ localManager.startManager();
+ system.getDistributionManager().addMembershipListener(listener);
+ isStarted = true;
+ return this;
+ } catch (CancelException e) {
+ // Rethrow all CancelExceptions
+ throw e;
+ } catch (Exception e) {
+ // Wrap all other exceptions as ManagementExceptions
+ logger.error(e.getMessage(), e);
+ throw new ManagementException(e);
+ }
+ }
+
+ private static FederatingManagerFactory createFederatingManagerFactory() {
+ try {
+ String federatingManagerFactoryName =
+ System.getProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
+ FederatingManagerFactoryImpl.class.getName());
+ Class<? extends FederatingManagerFactory> federatingManagerFactoryClass =
+ Class.forName(federatingManagerFactoryName)
+ .asSubclass(FederatingManagerFactory.class);
+ Constructor<? extends FederatingManagerFactory> constructor =
+ federatingManagerFactoryClass.getConstructor();
+ return constructor.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
+ | NoSuchMethodException | InvocationTargetException e) {
+ return new FederatingManagerFactoryImpl();
+ }
+ }
+
+ @VisibleForTesting
+ public NotificationHub getNotificationHub() {
+ return notificationHub;
+ }
+
+ private static class FederatingManagerFactoryImpl implements FederatingManagerFactory {
+
+ public FederatingManagerFactoryImpl() {
+ // must be public for instantiation by reflection
+ }
+
+ @Override
+ public FederatingManager create(ManagementResourceRepo repo, InternalDistributedSystem system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
+ ExecutorService executorService) {
+ return new FederatingManager(repo, system, service, cache, statisticsFactory,
+ statisticsClock, proxyFactory, messenger, executorService);
+ }
+ }
+
+ static class UniversalListenerContainer {
+
+ private final Collection<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
+
+ void memberJoined(InternalDistributedMember id) {
MembershipEvent event = createEvent(id);
for (MembershipListener listener : membershipListeners) {
try {
@@ -713,18 +743,9 @@
}
}
- public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+ void memberDeparted(InternalDistributedMember id, boolean crashed) {
MembershipEvent event = createEvent(id);
- if (!crashed) {
- for (MembershipListener listener : membershipListeners) {
- try {
- listener.memberLeft(event);
- } catch (Exception e) {
- logger.error("Could not invoke listener event memberLeft for listener[{}] due to ",
- listener.getClass(), e.getMessage(), e);
- }
- }
- } else {
+ if (crashed) {
for (MembershipListener listener : membershipListeners) {
try {
listener.memberCrashed(event);
@@ -733,32 +754,23 @@
listener.getClass(), e.getMessage(), e);
}
}
+ } else {
+ for (MembershipListener listener : membershipListeners) {
+ try {
+ listener.memberLeft(event);
+ } catch (Exception e) {
+ logger.error("Could not invoke listener event memberLeft for listener[{}] due to ",
+ listener.getClass(), e.getMessage(), e);
+ }
+ }
}
}
- private MembershipEvent createEvent(InternalDistributedMember id) {
- final String memberId = id.getId();
- final DistributedMember member = id;
-
- return new MembershipEvent() {
-
- @Override
- public String getMemberId() {
- return memberId;
- }
-
- @Override
- public DistributedMember getDistributedMember() {
- return member;
- }
- };
- }
-
/**
* Registers a listener that receives call backs when a member joins or leaves the distributed
* system.
*/
- public void addMembershipListener(MembershipListener listener) {
+ private void addMembershipListener(MembershipListener listener) {
membershipListeners.add(listener);
}
@@ -767,23 +779,23 @@
*
* @see #addMembershipListener
*/
- public void removeMembershipListener(MembershipListener listener) {
+ private void removeMembershipListener(MembershipListener listener) {
membershipListeners.remove(listener);
}
- }
- public UniversalListenerContainer getUniversalListenerContainer() {
- return universalListenerContainer;
- }
+ private MembershipEvent createEvent(DistributedMember id) {
+ return new MembershipEvent() {
- @Override
- public void addMembershipListener(MembershipListener listener) {
- universalListenerContainer.addMembershipListener(listener);
+ @Override
+ public String getMemberId() {
+ return id.getId();
+ }
- }
-
- @Override
- public void removeMembershipListener(MembershipListener listener) {
- universalListenerContainer.removeMembershipListener(listener);
+ @Override
+ public DistributedMember getDistributedMember() {
+ return id;
+ }
+ };
+ }
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 1327d6f..bd8702b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -14,9 +14,68 @@
*/
package org.apache.geode.management.internal.beans;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.geode.management.JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.ASYNC_EVENT_QUEUE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.CACHE_SERVICE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_CRASHED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_JOINED;
+import static org.apache.geode.management.JMXNotificationType.CLIENT_LEFT;
+import static org.apache.geode.management.JMXNotificationType.DISK_STORE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.DISK_STORE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_DESTROYED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_RECEIVER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_CREATED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_PAUSED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_REMOVED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_RESUMED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_STARTED;
+import static org.apache.geode.management.JMXNotificationType.GATEWAY_SENDER_STOPPED;
+import static org.apache.geode.management.JMXNotificationType.LOCATOR_STARTED;
+import static org.apache.geode.management.JMXNotificationType.LOCK_SERVICE_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.LOCK_SERVICE_CREATED;
+import static org.apache.geode.management.JMXNotificationType.REGION_CLOSED;
+import static org.apache.geode.management.JMXNotificationType.REGION_CREATED;
+import static org.apache.geode.management.JMXNotificationType.SYSTEM_ALERT;
+import static org.apache.geode.management.JMXNotificationUserData.ALERT_LEVEL;
+import static org.apache.geode.management.JMXNotificationUserData.MEMBER;
+import static org.apache.geode.management.JMXNotificationUserData.THREAD;
+import static org.apache.geode.management.internal.AlertDetails.getAlertLevelAsString;
+import static org.apache.geode.management.internal.ManagementConstants.AGGREGATE_MBEAN_PATTERN;
+import static org.apache.geode.management.internal.ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.ASYNC_EVENT_QUEUE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CACHE_SERVICE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_CRASHED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_JOINED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.CLIENT_LEFT_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.DISK_STORE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.DISK_STORE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_RECEIVER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_PAUSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_REMOVED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_RESUMED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.GATEWAY_SENDER_STOPPED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCATOR_STARTED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCK_SERVICE_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.LOCK_SERVICE_CREATED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.REGION_CLOSED_PREFIX;
+import static org.apache.geode.management.internal.ManagementConstants.REGION_CREATED_PREFIX;
+
+import java.lang.management.ManagementFactory;
import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,18 +89,16 @@
import javax.management.ObjectInstance;
import javax.management.ObjectName;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Immutable;
-import org.apache.geode.annotations.internal.MakeNotStatic;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -51,7 +108,6 @@
import org.apache.geode.internal.ClassLoadUtil;
import org.apache.geode.internal.cache.CacheService;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.AsyncEventQueueMXBean;
@@ -59,8 +115,6 @@
import org.apache.geode.management.DiskStoreMXBean;
import org.apache.geode.management.GatewayReceiverMXBean;
import org.apache.geode.management.GatewaySenderMXBean;
-import org.apache.geode.management.JMXNotificationType;
-import org.apache.geode.management.JMXNotificationUserData;
import org.apache.geode.management.LocatorMXBean;
import org.apache.geode.management.LockServiceMXBean;
import org.apache.geode.management.ManagementException;
@@ -71,7 +125,6 @@
import org.apache.geode.management.internal.AlertDetails;
import org.apache.geode.management.internal.FederationComponent;
import org.apache.geode.management.internal.MBeanJMXAdapter;
-import org.apache.geode.management.internal.ManagementConstants;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.membership.ClientMembership;
import org.apache.geode.management.membership.ClientMembershipEvent;
@@ -83,6 +136,7 @@
* Acts as an intermediate between MBean layer and Federation Layer. Handles all Call backs from
* GemFire to instantiate or remove MBeans from GemFire Domain.
*
+ * <p>
* Even though this class have a lot of utility functions it interacts with the state of the system
* and contains some state itself.
*/
@@ -90,95 +144,57 @@
private static final Logger logger = LogService.getLogger();
- /** Internal ManagementService Instance **/
- private SystemManagementService service;
-
- /** GemFire Cache impl **/
- private InternalCache internalCache;
-
- /** Member Name **/
- private String memberSource;
-
- /**
- * emitter is a helper class for sending notifications on behalf of the MemberMBean
- **/
- private NotificationBroadcasterSupport memberLevelNotifEmitter;
-
- /** The <code>MBeanServer</code> for this application */
- @MakeNotStatic
- public static final MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
-
- /** MemberMBean instance **/
- private MemberMBean memberBean;
-
- private volatile boolean serviceInitialised = false;
-
- @MakeNotStatic
- private MBeanAggregator aggregator;
-
@Immutable
- public static final List<Class> refreshOnInit;
+ private static final List<String> INTERNAL_LOCK_SERVICES =
+ unmodifiableList(asList(DLockService.DTLS, DLockService.LTLS,
+ PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
+ PeerTypeRegistration.LOCK_SERVICE_NAME));
- @Immutable
- public static final List<String> internalLocks;
-
- static {
- refreshOnInit =
- Collections.unmodifiableList(Arrays.asList(RegionMXBean.class, MemberMXBean.class));
-
- internalLocks = Collections.unmodifiableList(Arrays.asList(
- DLockService.DTLS, // From reserved lock service name
- DLockService.LTLS, // From reserved lock service name
- PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
- PeerTypeRegistration.LOCK_SERVICE_NAME));
- }
-
- protected MemberMBeanBridge memberMBeanBridge;
-
+ private final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
private final Object regionOpLock = new Object();
+ private SystemManagementService service;
+ private InternalCache internalCache;
+ private String memberSource;
+ private NotificationBroadcasterSupport memberLevelNotificationEmitter;
+ private MemberMBean memberBean;
+ private MBeanAggregator aggregator;
+ private MemberMBeanBridge memberMBeanBridge;
+
+ private volatile boolean serviceInitialised;
+
/**
- * Adapter life cycle is tied with the Cache . So its better to make all cache level artifacts as
+ * Adapter life cycle is tied with the Cache. So its better to make all cache level artifacts as
* instance variable
- *
- * @param cache gemfire cache
*/
- protected void handleCacheCreation(InternalCache cache) throws ManagementException {
+ void handleCacheCreation(InternalCache cache) throws ManagementException {
try {
- this.internalCache = cache;
- this.service =
- (SystemManagementService) ManagementService.getManagementService(internalCache);
+ internalCache = cache;
+ service = (SystemManagementService) ManagementService.getManagementService(internalCache);
- this.memberMBeanBridge = new MemberMBeanBridge(internalCache, service).init();
- this.memberBean = new MemberMBean(memberMBeanBridge);
- this.memberLevelNotifEmitter = memberBean;
+ memberMBeanBridge = new MemberMBeanBridge(internalCache, service).init();
+ memberBean = new MemberMBean(memberMBeanBridge);
+ memberLevelNotificationEmitter = memberBean;
+ memberSource = MBeanJMXAdapter.getMemberNameOrUniqueId(cache.getMyId());
- ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(
- InternalDistributedSystem.getConnectedInstance().getDistributedMember());
+ // Type casting to MemberMXBean to expose only those methods described in interface
+ ObjectName objectName = MBeanJMXAdapter.getMemberMBeanName(cache.getMyId());
+ ObjectName federatedName = service.registerInternalMBean(memberBean, objectName);
+ service.federate(federatedName, MemberMXBean.class, true);
- memberSource = MBeanJMXAdapter
- .getMemberNameOrUniqueId(internalCache.getDistributedSystem().getDistributedMember());
+ serviceInitialised = true;
- // Type casting to MemberMXBean to expose only those methods described in
- // the interface;
- ObjectName changedMBeanName = service.registerInternalMBean(memberBean, memberMBeanName);
- service.federate(changedMBeanName, MemberMXBean.class, true);
-
- this.serviceInitialised = true;
-
- // Service initialised is only for ManagementService and not necessarily
- // Manager service.
+ // Service initialised is only for ManagementService and not necessarily Manager service.
// For situations where locator is created before any cache is created
if (InternalLocator.hasLocator()) {
- Locator loc = InternalLocator.getLocator();
- handleLocatorStart(loc);
+ handleLocatorStart(InternalLocator.getLocator());
}
if (cache.getInternalDistributedSystem().getConfig().getJmxManager()) {
- this.service.createManager();
+ service.createManager();
if (cache.getInternalDistributedSystem().getConfig().getJmxManagerStart()) {
- this.service.startManager();
+ service.startManager();
}
}
@@ -201,48 +217,50 @@
/**
* Handles all the distributed mbean creation part when a Manager is started
*/
- protected void handleManagerStart() throws ManagementException {
+ void handleManagerStart() throws ManagementException {
if (!isServiceInitialised("handleManagerStart")) {
return;
}
- MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
- Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
- DistributedSystemBridge dsBridge = new DistributedSystemBridge(service, internalCache);
- this.aggregator = new MBeanAggregator(dsBridge);
+ DistributedSystemBridge distributedSystemBridge =
+ new DistributedSystemBridge(service, internalCache);
+ aggregator = new MBeanAggregator(distributedSystemBridge);
+
// register the aggregator for Federation framework to use
service.addProxyListener(aggregator);
- /*
- * get the local member mbean as it need to be provided to aggregator first
- */
+ // get the local member mbean as it need to be provided to aggregator first
+ MemberMXBean localMemberMXBean = service.getMemberMXBean();
- MemberMXBean localMember = service.getMemberMXBean();
ObjectName memberObjectName = MBeanJMXAdapter.getMemberMBeanName(
InternalDistributedSystem.getConnectedInstance().getDistributedMember());
- FederationComponent addedComp =
+ FederationComponent memberFederation =
service.getLocalManager().getFedComponents().get(memberObjectName);
- service.afterCreateProxy(memberObjectName, MemberMXBean.class, localMember, addedComp);
+ service.afterCreateProxy(memberObjectName, MemberMXBean.class, localMemberMXBean,
+ memberFederation);
+
+ MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
+ Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
for (ObjectName objectName : registeredMBeans.keySet()) {
if (objectName.equals(memberObjectName)) {
continue;
}
Object object = registeredMBeans.get(objectName);
- ObjectInstance instance;
try {
- instance = mbeanServer.getObjectInstance(objectName);
+ ObjectInstance instance = mbeanServer.getObjectInstance(objectName);
String className = instance.getClassName();
- Class cls = ClassLoadUtil.classFromName(className);
- Type[] intfTyps = cls.getGenericInterfaces();
+ Class clazz = ClassLoadUtil.classFromName(className);
+ Type[] interfaceTypes = clazz.getGenericInterfaces();
- FederationComponent newObj = service.getLocalManager().getFedComponents().get(objectName);
+ FederationComponent federation =
+ service.getLocalManager().getFedComponents().get(objectName);
- for (Type intfTyp1 : intfTyps) {
- Class intfTyp = (Class) intfTyp1;
- service.afterCreateProxy(objectName, intfTyp, object, newObj);
+ for (Type interfaceType : interfaceTypes) {
+ Class interfaceTypeAsClass = (Class) interfaceType;
+ service.afterCreateProxy(objectName, interfaceTypeAsClass, object, federation);
}
} catch (InstanceNotFoundException e) {
@@ -263,75 +281,69 @@
* Handles all the clean up activities when a Manager is stopped It clears the distributed mbeans
* and underlying data structures
*/
- protected void handleManagerStop() throws ManagementException {
+ void handleManagerStop() throws ManagementException {
if (!isServiceInitialised("handleManagerStop")) {
return;
}
- MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
- Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
- ObjectName aggregatemMBeanPattern;
- try {
- aggregatemMBeanPattern = new ObjectName(ManagementConstants.AGGREGATE_MBEAN_PATTERN);
- } catch (MalformedObjectNameException | NullPointerException e1) {
- throw new ManagementException(e1);
- }
-
- MemberMXBean localMember = service.getMemberMXBean();
+ MemberMXBean localMemberMXBean = service.getMemberMXBean();
ObjectName memberObjectName = MBeanJMXAdapter.getMemberMBeanName(
InternalDistributedSystem.getConnectedInstance().getDistributedMember());
- FederationComponent removedComp =
+ FederationComponent memberFederation =
service.getLocalManager().getFedComponents().get(memberObjectName);
- service.afterRemoveProxy(memberObjectName, MemberMXBean.class, localMember, removedComp);
+ service.afterRemoveProxy(memberObjectName, MemberMXBean.class, localMemberMXBean,
+ memberFederation);
+ ObjectName aggregateMBeanPattern = aggregateMBeanPattern();
+ MBeanJMXAdapter jmxAdapter = service.getJMXAdapter();
+ Map<ObjectName, Object> registeredMBeans = jmxAdapter.getLocalGemFireMBean();
for (ObjectName objectName : registeredMBeans.keySet()) {
if (objectName.equals(memberObjectName)) {
continue;
}
- if (aggregatemMBeanPattern.apply(objectName)) {
+ if (aggregateMBeanPattern.apply(objectName)) {
continue;
}
Object object = registeredMBeans.get(objectName);
- ObjectInstance instance;
try {
- instance = mbeanServer.getObjectInstance(objectName);
+ ObjectInstance instance = mbeanServer.getObjectInstance(objectName);
String className = instance.getClassName();
- Class cls = ClassLoadUtil.classFromName(className);
- Type[] intfTyps = cls.getGenericInterfaces();
+ Class clazz = ClassLoadUtil.classFromName(className);
+ Type[] interfaceTypes = clazz.getGenericInterfaces();
- FederationComponent oldObj = service.getLocalManager().getFedComponents().get(objectName);
+ FederationComponent federation =
+ service.getLocalManager().getFedComponents().get(objectName);
- for (Type intfTyp1 : intfTyps) {
- Class intfTyp = (Class) intfTyp1;
- service.afterRemoveProxy(objectName, intfTyp, object, oldObj);
+ for (Type interfaceType : interfaceTypes) {
+ Class interfaceTypeClass = (Class) interfaceType;
+ service.afterRemoveProxy(objectName, interfaceTypeClass, object, federation);
}
} catch (InstanceNotFoundException | ClassNotFoundException e) {
logger.warn("Failed to invoke aggregator for {} with exception {}", objectName,
e.getMessage(), e);
}
}
- service.removeProxyListener(this.aggregator);
- this.aggregator = null;
+
+ service.removeProxyListener(aggregator);
+ aggregator = null;
}
/**
* Assumption is always cache and MemberMbean has been will be created first
*/
- protected void handleManagerCreation() throws ManagementException {
+ void handleManagerCreation() throws ManagementException {
if (!isServiceInitialised("handleManagerCreation")) {
return;
}
- ObjectName managerMBeanName = MBeanJMXAdapter.getManagerName();
+ ObjectName objectName = MBeanJMXAdapter.getManagerName();
+ ManagerMBeanBridge managerMBeanBridge = new ManagerMBeanBridge(service);
+ ManagerMXBean managerMXBean = new ManagerMBean(managerMBeanBridge);
- ManagerMBeanBridge bridge = new ManagerMBeanBridge(service);
-
- ManagerMXBean bean = new ManagerMBean(bridge);
-
- service.registerInternalMBean(bean, managerMBeanName);
+ service.registerInternalMBean(managerMXBean, objectName);
}
/**
@@ -340,87 +352,85 @@
*
* @param region the region for which the call back is invoked
*/
- public <K, V> void handleRegionCreation(Region<K, V> region) throws ManagementException {
+ <K, V> void handleRegionCreation(Region<K, V> region) throws ManagementException {
if (!isServiceInitialised("handleRegionCreation")) {
return;
}
+
// Moving region creation operation inside a guarded block
// After getting access to regionOpLock it again checks for region
// destroy status
synchronized (regionOpLock) {
- LocalRegion localRegion = (LocalRegion) region;
- if (localRegion.isDestroyed()) {
+ if (region.isDestroyed()) {
return;
}
+
// Bridge is responsible for extracting data from GemFire Layer
- RegionMBeanBridge<K, V> bridge = RegionMBeanBridge.getInstance(region);
-
- RegionMXBean regionMBean = new RegionMBean<>(bridge);
- ObjectName regionMBeanName = MBeanJMXAdapter.getRegionMBeanName(
+ RegionMBeanBridge<K, V> regionMBeanBridge = RegionMBeanBridge.getInstance(region);
+ RegionMXBean regionMXBean = new RegionMBean<>(regionMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter.getRegionMBeanName(
internalCache.getDistributedSystem().getDistributedMember(), region.getFullPath());
- ObjectName changedMBeanName = service.registerInternalMBean(regionMBean, regionMBeanName);
- service.federate(changedMBeanName, RegionMXBean.class, true);
+ ObjectName federatedName = service.registerInternalMBean(regionMXBean, objectName);
+ service.federate(federatedName, RegionMXBean.class, true);
- Notification notification = new Notification(JMXNotificationType.REGION_CREATED, memberSource,
+ Notification notification = new Notification(REGION_CREATED, memberSource,
SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.REGION_CREATED_PREFIX + region.getFullPath());
- memberLevelNotifEmitter.sendNotification(notification);
+ REGION_CREATED_PREFIX + region.getFullPath());
+ memberLevelNotificationEmitter.sendNotification(notification);
+
memberMBeanBridge.addRegion(region);
}
}
/**
* Handles Disk Creation. Will create DiskStoreMXBean and will send a notification
- *
- * @param disk the disk store for which the call back is invoked
*/
- protected void handleDiskCreation(DiskStore disk) throws ManagementException {
+ void handleDiskCreation(DiskStore diskStore) throws ManagementException {
if (!isServiceInitialised("handleDiskCreation")) {
return;
}
- DiskStoreMBeanBridge bridge = new DiskStoreMBeanBridge(disk);
- DiskStoreMXBean diskStoreMBean = new DiskStoreMBean(bridge);
- ObjectName diskStoreMBeanName = MBeanJMXAdapter.getDiskStoreMBeanName(
- internalCache.getDistributedSystem().getDistributedMember(), disk.getName());
- ObjectName changedMBeanName = service.registerInternalMBean(diskStoreMBean, diskStoreMBeanName);
- service.federate(changedMBeanName, DiskStoreMXBean.class, true);
+ DiskStoreMBeanBridge diskStoreMBeanBridge = new DiskStoreMBeanBridge(diskStore);
+ DiskStoreMXBean diskStoreMXBean = new DiskStoreMBean(diskStoreMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter.getDiskStoreMBeanName(
+ internalCache.getDistributedSystem().getDistributedMember(), diskStore.getName());
+ ObjectName federatedName = service.registerInternalMBean(diskStoreMXBean, objectName);
+ service.federate(federatedName, DiskStoreMXBean.class, true);
- Notification notification = new Notification(JMXNotificationType.DISK_STORE_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.DISK_STORE_CREATED_PREFIX + disk.getName());
- memberLevelNotifEmitter.sendNotification(notification);
- memberMBeanBridge.addDiskStore(disk);
+ Notification notification = new Notification(DISK_STORE_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ DISK_STORE_CREATED_PREFIX + diskStore.getName());
+ memberLevelNotificationEmitter.sendNotification(notification);
+
+ memberMBeanBridge.addDiskStore(diskStore);
}
/**
* Handles LockService Creation
*
*/
- protected void handleLockServiceCreation(DLockService lockService) throws ManagementException {
+ void handleLockServiceCreation(DLockService lockService) throws ManagementException {
if (!isServiceInitialised("handleLockServiceCreation")) {
return;
}
// Internal Locks Should not be exposed to client for monitoring
- if (internalLocks.contains(lockService.getName())) {
+ if (INTERNAL_LOCK_SERVICES.contains(lockService.getName())) {
return;
}
- LockServiceMBeanBridge bridge = new LockServiceMBeanBridge(lockService);
- LockServiceMXBean lockServiceMBean = new LockServiceMBean(bridge);
- ObjectName lockServiceMBeanName = MBeanJMXAdapter.getLockServiceMBeanName(
+ LockServiceMBeanBridge lockServiceMBeanBridge = new LockServiceMBeanBridge(lockService);
+ LockServiceMXBean lockServiceMXBean = new LockServiceMBean(lockServiceMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter.getLockServiceMBeanName(
internalCache.getDistributedSystem().getDistributedMember(), lockService.getName());
+ ObjectName federatedName =
+ service.registerInternalMBean(lockServiceMXBean, objectName);
+ service.federate(federatedName, LockServiceMXBean.class, true);
- ObjectName changedMBeanName =
- service.registerInternalMBean(lockServiceMBean, lockServiceMBeanName);
-
- service.federate(changedMBeanName, LockServiceMXBean.class, true);
-
- Notification notification = new Notification(JMXNotificationType.LOCK_SERVICE_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.LOCK_SERVICE_CREATED_PREFIX + lockService.getName());
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(LOCK_SERVICE_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ LOCK_SERVICE_CREATED_PREFIX + lockService.getName());
+ memberLevelNotificationEmitter.sendNotification(notification);
memberMBeanBridge.addLockServiceStats(lockService);
}
@@ -430,165 +440,151 @@
*
* @param sender the specific gateway sender
*/
- protected void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderCreation")) {
return;
}
- GatewaySenderMBeanBridge bridge = new GatewaySenderMBeanBridge(sender);
- GatewaySenderMXBean senderMBean = new GatewaySenderMBean(bridge);
- ObjectName senderObjectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
+ GatewaySenderMBeanBridge gatewaySenderMBeanBridge = new GatewaySenderMBeanBridge(sender);
+ GatewaySenderMXBean gatewaySenderMXBean = new GatewaySenderMBean(gatewaySenderMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
internalCache.getDistributedSystem().getDistributedMember(), sender.getId());
+ ObjectName federatedName = service.registerInternalMBean(gatewaySenderMXBean, objectName);
+ service.federate(federatedName, GatewaySenderMXBean.class, true);
- ObjectName changedMBeanName = service.registerInternalMBean(senderMBean, senderObjectName);
-
- service.federate(changedMBeanName, GatewaySenderMXBean.class, true);
-
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_CREATED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_SENDER_CREATED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
* Handles Gateway receiver creation
*
- * @param recv specific gateway receiver
+ * @param gatewayReceiver specific gateway receiver
*/
- protected void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException {
+ void handleGatewayReceiverCreate(GatewayReceiver gatewayReceiver) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverCreate")) {
return;
}
- if (!recv.isManualStart()) {
+ if (!gatewayReceiver.isManualStart()) {
return;
}
- createGatewayReceiverMBean(recv);
+ createGatewayReceiverMBean(gatewayReceiver);
}
- private void createGatewayReceiverMBean(GatewayReceiver recv) {
- GatewayReceiverMBeanBridge bridge = new GatewayReceiverMBeanBridge(recv);
-
- GatewayReceiverMXBean receiverMBean = new GatewayReceiverMBean(bridge);
- ObjectName recvObjectName = MBeanJMXAdapter
+ private void createGatewayReceiverMBean(GatewayReceiver gatewayReceiver) {
+ GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge =
+ new GatewayReceiverMBeanBridge(gatewayReceiver);
+ GatewayReceiverMXBean gatewayReceiverMXBean =
+ new GatewayReceiverMBean(gatewayReceiverMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter
.getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember());
+ ObjectName federatedName = service.registerInternalMBean(gatewayReceiverMXBean, objectName);
+ service.federate(federatedName, GatewayReceiverMXBean.class, true);
- ObjectName changedMBeanName = service.registerInternalMBean(receiverMBean, recvObjectName);
-
- service.federate(changedMBeanName, GatewayReceiverMXBean.class, true);
-
- Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_RECEIVER_CREATED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_RECEIVER_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_CREATED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- /**
- * Handles Gateway receiver destroy
- *
- * @param recv specific gateway receiver
- */
- protected void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException {
+ void handleGatewayReceiverDestroy() throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverDestroy")) {
return;
}
- GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
- GatewayReceiverMBeanBridge bridge = mbean.getBridge();
+ GatewayReceiverMBean gatewayReceiverMBean =
+ (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+ GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
- bridge.destroyServer();
- ObjectName objectName = (MBeanJMXAdapter
- .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember()));
+ gatewayReceiverMBeanBridge.destroyServer();
+ ObjectName objectName = MBeanJMXAdapter
+ .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember());
service.unregisterMBean(objectName);
- Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_DESTROYED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+
+ Notification notification = new Notification(GATEWAY_RECEIVER_DESTROYED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_DESTROYED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
* Handles Gateway receiver creation
*
- * @param recv specific gateway receiver
+ * @param gatewayReceiver specific gateway receiver
*/
- protected void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException {
+ void handleGatewayReceiverStart(GatewayReceiver gatewayReceiver) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverStart")) {
return;
}
- if (!recv.isManualStart()) {
- createGatewayReceiverMBean(recv);
+ if (!gatewayReceiver.isManualStart()) {
+ createGatewayReceiverMBean(gatewayReceiver);
}
- GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
- GatewayReceiverMBeanBridge bridge = mbean.getBridge();
+ GatewayReceiverMBean gatewayReceiverMBean =
+ (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+ GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
- bridge.startServer();
+ gatewayReceiverMBeanBridge.startServer();
- Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_STARTED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_RECEIVER_STARTED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_RECEIVER_STARTED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_STARTED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- /**
- * Handles Gateway receiver creation
- *
- * @param recv specific gateway receiver
- */
- protected void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException {
+ void handleGatewayReceiverStop() throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverStop")) {
return;
}
- GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
- GatewayReceiverMBeanBridge bridge = mbean.getBridge();
- bridge.stopServer();
+ GatewayReceiverMBean gatewayReceiverMBean =
+ (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+ GatewayReceiverMBeanBridge gatewayReceiverMBeanBridge = gatewayReceiverMBean.getBridge();
- Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_STOPPED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_RECEIVER_STOPPED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+ gatewayReceiverMBeanBridge.stopServer();
+
+ Notification notification = new Notification(GATEWAY_RECEIVER_STOPPED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), GATEWAY_RECEIVER_STOPPED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException {
+ void handleAsyncEventQueueCreation(AsyncEventQueue asyncEventQueue) throws ManagementException {
if (!isServiceInitialised("handleAsyncEventQueueCreation")) {
return;
}
- AsyncEventQueueMBeanBridge bridge = new AsyncEventQueueMBeanBridge(queue);
- AsyncEventQueueMXBean queueMBean = new AsyncEventQueueMBean(bridge);
- ObjectName senderObjectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
- internalCache.getDistributedSystem().getDistributedMember(), queue.getId());
- ObjectName changedMBeanName = service.registerInternalMBean(queueMBean, senderObjectName);
+ AsyncEventQueueMBeanBridge asyncEventQueueMBeanBridge =
+ new AsyncEventQueueMBeanBridge(asyncEventQueue);
+ AsyncEventQueueMXBean asyncEventQueueMXBean =
+ new AsyncEventQueueMBean(asyncEventQueueMBeanBridge);
+ ObjectName objectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
+ internalCache.getDistributedSystem().getDistributedMember(), asyncEventQueue.getId());
+ ObjectName federatedName = service.registerInternalMBean(asyncEventQueueMXBean, objectName);
+ service.federate(federatedName, AsyncEventQueueMXBean.class, true);
- service.federate(changedMBeanName, AsyncEventQueueMXBean.class, true);
-
- Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.ASYNC_EVENT_QUEUE_CREATED_PREFIX);
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(ASYNC_EVENT_QUEUE_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), ASYNC_EVENT_QUEUE_CREATED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- /**
- * Handles AsyncEventQueue Removal
- *
- * @param queue The AsyncEventQueue being removed
- */
- protected void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
+ void handleAsyncEventQueueRemoval(AsyncEventQueue asyncEventQueue) throws ManagementException {
if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
return;
}
- ObjectName asycnEventQueueMBeanName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
- internalCache.getDistributedSystem().getDistributedMember(), queue.getId());
- AsyncEventQueueMBean bean;
+ ObjectName objectName = MBeanJMXAdapter.getAsyncEventQueueMBeanName(
+ internalCache.getDistributedSystem().getDistributedMember(), asyncEventQueue.getId());
+
try {
- bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(queue.getId());
- if (bean == null) {
+ AsyncEventQueueMBean asyncEventQueueMBean =
+ (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(asyncEventQueue.getId());
+ if (asyncEventQueueMBean == null) {
return;
}
+
+ asyncEventQueueMBean.stopMonitor();
+
} catch (ManagementException e) {
// If no bean found its a NO-OP
if (logger.isDebugEnabled()) {
@@ -597,55 +593,39 @@
return;
}
- bean.stopMonitor();
+ service.unregisterMBean(objectName);
- service.unregisterMBean(asycnEventQueueMBeanName);
-
- Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX + queue.getId());
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(ASYNC_EVENT_QUEUE_CLOSED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ ASYNC_EVENT_QUEUE_CLOSED_PREFIX + asyncEventQueue.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
* Sends the alert with the Object source as member. This notification will get filtered out for
* particular alert level
- *
*/
- protected void handleSystemNotification(AlertDetails details) {
+ void handleSystemNotification(AlertDetails alertDetails) {
if (!isServiceInitialised("handleSystemNotification")) {
return;
}
+
if (service.isManager()) {
String systemSource = "DistributedSystem("
+ service.getDistributedSystemMXBean().getDistributedSystemId() + ")";
- Map<String, String> userData = prepareUserData(details);
-
-
- Notification notification = new Notification(JMXNotificationType.SYSTEM_ALERT, systemSource,
- SequenceNumber.next(), details.getMsgTime().getTime(), details.getMsg());
-
- notification.setUserData(userData);
+ Notification notification = new Notification(SYSTEM_ALERT, systemSource,
+ SequenceNumber.next(), alertDetails.getMsgTime().getTime(), alertDetails.getMsg());
+ notification.setUserData(prepareUserData(alertDetails));
service.handleNotification(notification);
}
}
- private Map<String, String> prepareUserData(AlertDetails details) {
+ private Map<String, String> prepareUserData(AlertDetails alertDetails) {
Map<String, String> userData = new HashMap<>();
- userData.put(JMXNotificationUserData.ALERT_LEVEL,
- AlertDetails.getAlertLevelAsString(details.getAlertLevel()));
- String source = details.getSource();
- userData.put(JMXNotificationUserData.THREAD, source);
-
- InternalDistributedMember sender = details.getSender();
- String nameOrId = memberSource;
- if (sender != null) {
- nameOrId = sender.getName();
- nameOrId = StringUtils.isNotBlank(nameOrId) ? nameOrId : sender.getId();
- }
-
- userData.put(JMXNotificationUserData.MEMBER, nameOrId);
+ userData.put(ALERT_LEVEL, getAlertLevelAsString(alertDetails.getAlertLevel()));
+ userData.put(THREAD, alertDetails.getSource());
+ userData.put(MEMBER, getNameOrId(alertDetails.getSender()));
return userData;
}
@@ -655,83 +635,68 @@
*
* @param cacheServer cache server instance
*/
- protected void handleCacheServerStart(CacheServer cacheServer) {
+ void handleCacheServerStart(CacheServer cacheServer) {
if (!isServiceInitialised("handleCacheServerStart")) {
return;
}
CacheServerBridge cacheServerBridge = new CacheServerBridge(internalCache, cacheServer);
cacheServerBridge.setMemberMBeanBridge(memberMBeanBridge);
-
CacheServerMBean cacheServerMBean = new CacheServerMBean(cacheServerBridge);
-
- ObjectName cacheServerMBeanName = MBeanJMXAdapter.getClientServiceMBeanName(
+ ObjectName objectName = MBeanJMXAdapter.getClientServiceMBeanName(
cacheServer.getPort(), internalCache.getDistributedSystem().getDistributedMember());
-
- ObjectName changedMBeanName =
- service.registerInternalMBean(cacheServerMBean, cacheServerMBeanName);
+ ObjectName federatedName = service.registerInternalMBean(cacheServerMBean, objectName);
ClientMembershipListener managementClientListener = new CacheServerMembershipListenerAdapter(
- cacheServerMBean, memberLevelNotifEmitter, changedMBeanName);
+ cacheServerMBean, memberLevelNotificationEmitter, federatedName);
ClientMembership.registerClientMembershipListener(managementClientListener);
-
cacheServerBridge.setClientMembershipListener(managementClientListener);
- service.federate(changedMBeanName, CacheServerMXBean.class, true);
+ service.federate(federatedName, CacheServerMXBean.class, true);
- Notification notification = new Notification(JMXNotificationType.CACHE_SERVER_STARTED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CACHE_SERVER_STARTED_PREFIX);
-
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(CACHE_SERVER_STARTED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), CACHE_SERVER_STARTED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
memberMBeanBridge.setCacheServer(true);
}
/**
* Assumption is its a cache server instance. For Gateway receiver there will be a separate method
- *
- * @param server cache server instance
*/
- protected void handleCacheServerStop(CacheServer server) {
+ void handleCacheServerStop(CacheServer server) {
if (!isServiceInitialised("handleCacheServerStop")) {
return;
}
- CacheServerMBean mbean = (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
+ CacheServerMBean cacheServerMBean =
+ (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
- ClientMembershipListener listener = mbean.getBridge().getClientMembershipListener();
-
- if (listener != null) {
- ClientMembership.unregisterClientMembershipListener(listener);
+ ClientMembershipListener clientMembershipListener =
+ cacheServerMBean.getBridge().getClientMembershipListener();
+ if (clientMembershipListener != null) {
+ ClientMembership.unregisterClientMembershipListener(clientMembershipListener);
}
- mbean.stopMonitor();
+ cacheServerMBean.stopMonitor();
- ObjectName cacheServerMBeanName = MBeanJMXAdapter.getClientServiceMBeanName(server.getPort(),
+ ObjectName objectName = MBeanJMXAdapter.getClientServiceMBeanName(server.getPort(),
internalCache.getDistributedSystem().getDistributedMember());
- service.unregisterMBean(cacheServerMBeanName);
+ service.unregisterMBean(objectName);
- Notification notification = new Notification(JMXNotificationType.CACHE_SERVER_STOPPED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CACHE_SERVER_STOPPED_PREFIX);
-
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(CACHE_SERVER_STOPPED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), CACHE_SERVER_STOPPED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
memberMBeanBridge.setCacheServer(false);
}
- /**
- * Handles Cache removal. It will automatically remove all MBeans from GemFire Domain
- *
- * @param cache GemFire Cache instance. For now client cache is not supported
- */
- protected void handleCacheRemoval(Cache cache) throws ManagementException {
+ void handleCacheRemoval() throws ManagementException {
if (!isServiceInitialised("handleCacheRemoval")) {
return;
}
- this.serviceInitialised = false;
+ serviceInitialised = false;
try {
cleanUpMonitors();
cleanBridgeResources();
@@ -746,48 +711,57 @@
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
- this.internalCache = null;
- this.service = null;
- this.memberMBeanBridge = null;
- this.memberBean = null;
- this.memberLevelNotifEmitter = null;
+ internalCache = null;
+ service = null;
+ memberMBeanBridge = null;
+ memberBean = null;
+ memberLevelNotificationEmitter = null;
}
}
+ private String getNameOrId(DistributedMember member) {
+ if (member == null) {
+ return memberSource;
+ }
+ return isNotBlank(member.getName()) ? member.getName() : member.getId();
+ }
+
private void cleanUpMonitors() {
- MemberMBean bean = (MemberMBean) service.getMemberMXBean();
- if (bean != null) {
- bean.stopMonitor();
+ MemberMBean memberMBean = (MemberMBean) service.getMemberMXBean();
+ if (memberMBean != null) {
+ memberMBean.stopMonitor();
}
- Set<GatewaySender> senders = internalCache.getGatewaySenders();
+ Set<GatewaySender> gatewaySenders = internalCache.getGatewaySenders();
- if (senders != null && senders.size() > 0) {
- for (GatewaySender sender : senders) {
- GatewaySenderMBean senderMBean =
- (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
- if (senderMBean != null) {
- senderMBean.stopMonitor();
+ if (gatewaySenders != null && !gatewaySenders.isEmpty()) {
+ for (GatewaySender gatewaySender : gatewaySenders) {
+ GatewaySenderMBean gatewaySenderMBean =
+ (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
+ if (gatewaySenderMBean != null) {
+ gatewaySenderMBean.stopMonitor();
}
}
}
- GatewayReceiverMBean receiver = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
- if (receiver != null) {
- receiver.stopMonitor();
+ GatewayReceiverMBean gatewayReceiverMBean =
+ (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean();
+ if (gatewayReceiverMBean != null) {
+ gatewayReceiverMBean.stopMonitor();
}
}
private void cleanBridgeResources() {
- List<CacheServer> servers = internalCache.getCacheServers();
+ List<CacheServer> cacheServers = internalCache.getCacheServers();
- if (servers != null && servers.size() > 0) {
- for (CacheServer server : servers) {
- CacheServerMBean mbean =
- (CacheServerMBean) service.getLocalCacheServerMXBean(server.getPort());
+ if (cacheServers != null && !cacheServers.isEmpty()) {
+ for (CacheServer cacheServer : cacheServers) {
+ CacheServerMBean cacheServerMBean =
+ (CacheServerMBean) service.getLocalCacheServerMXBean(cacheServer.getPort());
- if (mbean != null) {
- ClientMembershipListener listener = mbean.getBridge().getClientMembershipListener();
+ if (cacheServerMBean != null) {
+ ClientMembershipListener listener =
+ cacheServerMBean.getBridge().getClientMembershipListener();
if (listener != null) {
ClientMembership.unregisterClientMembershipListener(listener);
@@ -799,23 +773,24 @@
/**
* Handles particular region destroy or close operation it will remove the corresponding MBean
- *
*/
- protected void handleRegionRemoval(Region region) throws ManagementException {
+ void handleRegionRemoval(Region region) throws ManagementException {
if (!isServiceInitialised("handleRegionRemoval")) {
return;
}
- /*
- * Moved region remove operation to a guarded block. If a region is getting created it wont
- * allow it to destroy any region.
- */
+
+ // Moved region remove operation to a guarded block. If a region is getting created it won't
+ // allow it to destroy any region.
synchronized (regionOpLock) {
- ObjectName regionMBeanName = MBeanJMXAdapter.getRegionMBeanName(
+ ObjectName objectName = MBeanJMXAdapter.getRegionMBeanName(
internalCache.getDistributedSystem().getDistributedMember(), region.getFullPath());
- RegionMBean bean;
+
try {
- bean = (RegionMBean) service.getLocalRegionMBean(region.getFullPath());
+ RegionMBean regionMBean = (RegionMBean) service.getLocalRegionMBean(region.getFullPath());
+ if (regionMBean != null) {
+ regionMBean.stopMonitor();
+ }
} catch (ManagementException e) {
// If no bean found its a NO-OP
// Mostly for situation like DiskAccessException while creating region
@@ -826,37 +801,32 @@
return;
}
- if (bean != null) {
- bean.stopMonitor();
- }
- service.unregisterMBean(regionMBeanName);
+ service.unregisterMBean(objectName);
- Notification notification = new Notification(JMXNotificationType.REGION_CLOSED, memberSource,
+ Notification notification = new Notification(REGION_CLOSED, memberSource,
SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.REGION_CLOSED_PREFIX + region.getFullPath());
- memberLevelNotifEmitter.sendNotification(notification);
+ REGION_CLOSED_PREFIX + region.getFullPath());
+ memberLevelNotificationEmitter.sendNotification(notification);
+
memberMBeanBridge.removeRegion(region);
}
}
- /**
- * Handles DiskStore Removal
- *
- */
- protected void handleDiskRemoval(DiskStore disk) throws ManagementException {
+ void handleDiskRemoval(DiskStore diskStore) throws ManagementException {
if (!isServiceInitialised("handleDiskRemoval")) {
return;
}
- ObjectName diskStoreMBeanName = MBeanJMXAdapter.getDiskStoreMBeanName(
- internalCache.getDistributedSystem().getDistributedMember(), disk.getName());
+ ObjectName objectName = MBeanJMXAdapter.getDiskStoreMBeanName(
+ internalCache.getDistributedSystem().getDistributedMember(), diskStore.getName());
- DiskStoreMBean bean;
try {
- bean = (DiskStoreMBean) service.getLocalDiskStoreMBean(disk.getName());
- if (bean == null) {
+ DiskStoreMBean diskStoreMBean =
+ (DiskStoreMBean) service.getLocalDiskStoreMBean(diskStore.getName());
+ if (diskStoreMBean == null) {
return;
}
+ diskStoreMBean.stopMonitor();
} catch (ManagementException e) {
// If no bean found its a NO-OP
if (logger.isDebugEnabled()) {
@@ -865,192 +835,193 @@
return;
}
- bean.stopMonitor();
+ service.unregisterMBean(objectName);
- service.unregisterMBean(diskStoreMBeanName);
+ Notification notification = new Notification(DISK_STORE_CLOSED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ DISK_STORE_CLOSED_PREFIX + diskStore.getName());
+ memberLevelNotificationEmitter.sendNotification(notification);
- Notification notification = new Notification(JMXNotificationType.DISK_STORE_CLOSED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.DISK_STORE_CLOSED_PREFIX + disk.getName());
- memberLevelNotifEmitter.sendNotification(notification);
- memberMBeanBridge.removeDiskStore(disk);
+ memberMBeanBridge.removeDiskStore(diskStore);
}
- /**
- * Handles Lock Service Removal
- *
- * @param lockService lock service instance
- */
- protected void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
+ void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
if (!isServiceInitialised("handleLockServiceRemoval")) {
return;
}
- ObjectName lockServiceMBeanName = MBeanJMXAdapter.getLockServiceMBeanName(
+ ObjectName objectName = MBeanJMXAdapter.getLockServiceMBeanName(
internalCache.getDistributedSystem().getDistributedMember(), lockService.getName());
- LockServiceMXBean bean = service.getLocalLockServiceMBean(lockService.getName());
+ service.unregisterMBean(objectName);
- service.unregisterMBean(lockServiceMBeanName);
-
- Notification notification = new Notification(JMXNotificationType.LOCK_SERVICE_CLOSED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.LOCK_SERVICE_CLOSED_PREFIX + lockService.getName());
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(LOCK_SERVICE_CLOSED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ LOCK_SERVICE_CLOSED_PREFIX + lockService.getName());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
* Handles management side call backs for a locator creation and start. Assumption is a cache will
* be created before hand.
*
+ * <p>
* There is no corresponding handleStopLocator() method. Locator will close the cache whenever its
* stopped and it should also shutdown all the management services by closing the cache.
*
* @param locator instance of locator which is getting started
*/
- protected void handleLocatorStart(Locator locator) throws ManagementException {
+ void handleLocatorStart(Locator locator) throws ManagementException {
if (!isServiceInitialised("handleLocatorCreation")) {
return;
}
- ObjectName locatorMBeanName = MBeanJMXAdapter
+ ObjectName objectName = MBeanJMXAdapter
.getLocatorMBeanName(internalCache.getDistributedSystem().getDistributedMember());
+ LocatorMBeanBridge locatorMBeanBridge = new LocatorMBeanBridge(locator);
+ LocatorMBean locatorMBean = new LocatorMBean(locatorMBeanBridge);
+ ObjectName federatedName = service.registerInternalMBean(locatorMBean, objectName);
+ service.federate(federatedName, LocatorMXBean.class, true);
- LocatorMBeanBridge bridge = new LocatorMBeanBridge(locator);
- LocatorMBean locatorMBean = new LocatorMBean(bridge);
-
- ObjectName changedMBeanName = service.registerInternalMBean(locatorMBean, locatorMBeanName);
-
- service.federate(changedMBeanName, LocatorMXBean.class, true);
-
- Notification notification =
- new Notification(JMXNotificationType.LOCATOR_STARTED, memberSource, SequenceNumber.next(),
- System.currentTimeMillis(), ManagementConstants.LOCATOR_STARTED_PREFIX);
-
- memberLevelNotifEmitter.sendNotification(notification);
-
+ Notification notification = new Notification(LOCATOR_STARTED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(), LOCATOR_STARTED_PREFIX);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleGatewaySenderStart(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderStart(GatewaySender gatewaySender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderStart")) {
return;
}
- if ((sender.getRemoteDSId() < 0)) {
+ if (gatewaySender.getRemoteDSId() < 0) {
return;
}
- GatewaySenderMBean bean =
- (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
- bean.getBridge().setDispatcher();
+ GatewaySenderMBean gatewaySenderMBean =
+ (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_STARTED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_STARTED_PREFIX + sender.getId());
+ gatewaySenderMBean.getBridge().setDispatcher();
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_STARTED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ GATEWAY_SENDER_STARTED_PREFIX + gatewaySender.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleGatewaySenderStop(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderStop(GatewaySender gatewaySender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderStop")) {
return;
}
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_STOPPED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_STOPPED_PREFIX + sender.getId());
-
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_STOPPED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ GATEWAY_SENDER_STOPPED_PREFIX + gatewaySender.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderPaused(GatewaySender gatewaySender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderPaused")) {
return;
}
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_PAUSED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_PAUSED_PREFIX + sender.getId());
-
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_PAUSED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ GATEWAY_SENDER_PAUSED_PREFIX + gatewaySender.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderResumed(GatewaySender gatewaySender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderResumed")) {
return;
}
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_RESUMED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_RESUMED_PREFIX + sender.getId());
-
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_RESUMED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ GATEWAY_SENDER_RESUMED_PREFIX + gatewaySender.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException {
+ void handleGatewaySenderRemoved(GatewaySender gatewaySender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderRemoved")) {
return;
}
- if ((sender.getRemoteDSId() < 0)) {
+ if (gatewaySender.getRemoteDSId() < 0) {
return;
}
- GatewaySenderMBean bean =
- (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId());
- bean.stopMonitor();
+ GatewaySenderMBean gatewaySenderMBean =
+ (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(gatewaySender.getId());
+ gatewaySenderMBean.stopMonitor();
- ObjectName gatewaySenderName = MBeanJMXAdapter.getGatewaySenderMBeanName(
- internalCache.getDistributedSystem().getDistributedMember(), sender.getId());
- service.unregisterMBean(gatewaySenderName);
+ ObjectName objectName = MBeanJMXAdapter.getGatewaySenderMBeanName(
+ internalCache.getDistributedSystem().getDistributedMember(), gatewaySender.getId());
+ service.unregisterMBean(objectName);
- Notification notification = new Notification(JMXNotificationType.GATEWAY_SENDER_REMOVED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.GATEWAY_SENDER_REMOVED_PREFIX + sender.getId());
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(GATEWAY_SENDER_REMOVED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ GATEWAY_SENDER_REMOVED_PREFIX + gatewaySender.getId());
+ memberLevelNotificationEmitter.sendNotification(notification);
}
- protected void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
+ void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
if (!isServiceInitialised("handleCacheServiceCreation")) {
return;
}
+
// Don't register the CacheServices in the Locator
InternalDistributedMember member =
internalCache.getInternalDistributedSystem().getDistributedMember();
if (member.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
return;
}
- CacheServiceMBeanBase mbean = cacheService.getMBean();
- if (mbean != null) {
- String id = mbean.getId();
- ObjectName cacheServiceObjectName = MBeanJMXAdapter.getCacheServiceMBeanName(member, id);
- ObjectName changedMBeanName = service.registerInternalMBean(mbean, cacheServiceObjectName);
+ CacheServiceMBeanBase cacheServiceMBean = cacheService.getMBean();
+ if (cacheServiceMBean != null) {
+ String id = cacheServiceMBean.getId();
+ ObjectName objectName = MBeanJMXAdapter.getCacheServiceMBeanName(member, id);
+ ObjectName federatedName = service.registerInternalMBean(cacheServiceMBean, objectName);
+ service.federate(federatedName, cacheServiceMBean.getInterfaceClass(), true);
- service.federate(changedMBeanName, mbean.getInterfaceClass(), true);
-
- Notification notification = new Notification(JMXNotificationType.CACHE_SERVICE_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CACHE_SERVICE_CREATED_PREFIX + id);
- memberLevelNotifEmitter.sendNotification(notification);
+ Notification notification = new Notification(CACHE_SERVICE_CREATED, memberSource,
+ SequenceNumber.next(), System.currentTimeMillis(),
+ CACHE_SERVICE_CREATED_PREFIX + id);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
}
+ private ObjectName aggregateMBeanPattern() {
+ try {
+ return new ObjectName(AGGREGATE_MBEAN_PATTERN);
+ } catch (MalformedObjectNameException | NullPointerException e) {
+ throw new ManagementException(e);
+ }
+ }
+
+ private boolean isServiceInitialised(String method) {
+ if (!serviceInitialised) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Management Service is not initialised hence returning from {}", method);
+ }
+ }
+
+ return serviceInitialised;
+ }
+
/**
- * Private class which acts as a ClientMembershipListener to propagate client joined/left
- * notifications
+ * Propagates client joined/left notifications
*/
private static class CacheServerMembershipListenerAdapter
extends ClientMembershipListenerAdapter {
- private NotificationBroadcasterSupport serverLevelNotifEmitter;
- private NotificationBroadcasterSupport memberLevelNotifEmitter;
+ private final NotificationBroadcasterSupport serverLevelNotificationEmitter;
+ private final NotificationBroadcasterSupport memberLevelNotificationEmitter;
+ private final String serverSource;
- private String serverSource;
-
- public CacheServerMembershipListenerAdapter(
- NotificationBroadcasterSupport serverLevelNotifEmitter,
- NotificationBroadcasterSupport memberLevelNotifEmitter, ObjectName serverSource) {
- this.serverLevelNotifEmitter = serverLevelNotifEmitter;
- this.memberLevelNotifEmitter = memberLevelNotifEmitter;
+ private CacheServerMembershipListenerAdapter(
+ NotificationBroadcasterSupport serverLevelNotificationEmitter,
+ NotificationBroadcasterSupport memberLevelNotificationEmitter,
+ ObjectName serverSource) {
+ this.serverLevelNotificationEmitter = serverLevelNotificationEmitter;
+ this.memberLevelNotificationEmitter = memberLevelNotificationEmitter;
this.serverSource = serverSource.toString();
}
@@ -1060,11 +1031,12 @@
*/
@Override
public void memberJoined(ClientMembershipEvent event) {
- Notification notification = new Notification(JMXNotificationType.CLIENT_JOINED, serverSource,
+ Notification notification = new Notification(CLIENT_JOINED, serverSource,
SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CLIENT_JOINED_PREFIX + event.getMemberId());
- serverLevelNotifEmitter.sendNotification(notification);
- memberLevelNotifEmitter.sendNotification(notification);
+ CLIENT_JOINED_PREFIX + event.getMemberId());
+
+ serverLevelNotificationEmitter.sendNotification(notification);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
@@ -1073,11 +1045,12 @@
*/
@Override
public void memberLeft(ClientMembershipEvent event) {
- Notification notification = new Notification(JMXNotificationType.CLIENT_LEFT, serverSource,
+ Notification notification = new Notification(CLIENT_LEFT, serverSource,
SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CLIENT_LEFT_PREFIX + event.getMemberId());
- serverLevelNotifEmitter.sendNotification(notification);
- memberLevelNotifEmitter.sendNotification(notification);
+ CLIENT_LEFT_PREFIX + event.getMemberId());
+
+ serverLevelNotificationEmitter.sendNotification(notification);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
/**
@@ -1086,24 +1059,12 @@
*/
@Override
public void memberCrashed(ClientMembershipEvent event) {
- Notification notification = new Notification(JMXNotificationType.CLIENT_CRASHED, serverSource,
+ Notification notification = new Notification(CLIENT_CRASHED, serverSource,
SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.CLIENT_CRASHED_PREFIX + event.getMemberId());
- serverLevelNotifEmitter.sendNotification(notification);
- memberLevelNotifEmitter.sendNotification(notification);
+ CLIENT_CRASHED_PREFIX + event.getMemberId());
+
+ serverLevelNotificationEmitter.sendNotification(notification);
+ memberLevelNotificationEmitter.sendNotification(notification);
}
-
}
-
- private boolean isServiceInitialised(String method) {
- if (!serviceInitialised) {
- if (logger.isDebugEnabled()) {
- logger.debug("Management Service is not initialised hence returning from {}", method);
- }
- return false;
- }
-
- return true;
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index a82a7fb..12edec3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -128,7 +128,7 @@
break;
case CACHE_REMOVE:
InternalCache removedCache = (InternalCache) resource;
- adapter.handleCacheRemoval(removedCache);
+ adapter.handleCacheRemoval();
break;
case REGION_CREATE:
Region createdRegion = (Region) resource;
@@ -152,7 +152,7 @@
break;
case GATEWAYRECEIVER_DESTROY:
GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverDestroy(destroyedRecv);
+ adapter.handleGatewayReceiverDestroy();
break;
case GATEWAYRECEIVER_START:
GatewayReceiver startedRecv = (GatewayReceiver) resource;
@@ -160,7 +160,7 @@
break;
case GATEWAYRECEIVER_STOP:
GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverStop(stoppededRecv);
+ adapter.handleGatewayReceiverStop();
break;
case GATEWAYSENDER_CREATE:
GatewaySender sender = (GatewaySender) resource;
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
index b776bed..c233935 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
@@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
@@ -25,42 +26,47 @@
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.alerting.internal.spi.AlertLevel;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.HasCachePerfStats;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.test.junit.categories.JMXTest;
+@Category(JMXTest.class)
public class FederatingManagerTest {
+ private InternalCache cache;
+ private InternalCacheForClientAccess cacheForClientAccess;
private MBeanJMXAdapter jmxAdapter;
private ManagementResourceRepo repo;
- private InternalDistributedSystem system;
private SystemManagementService service;
- private InternalCache cache;
private StatisticsFactory statisticsFactory;
private StatisticsClock statisticsClock;
- private InternalCacheForClientAccess cacheForClientAccess;
+ private InternalDistributedSystem system;
@Before
public void setUp() throws Exception {
+ cache = mock(InternalCache.class);
+ cacheForClientAccess = mock(InternalCacheForClientAccess.class);
jmxAdapter = mock(MBeanJMXAdapter.class);
repo = mock(ManagementResourceRepo.class);
- system = mock(InternalDistributedSystem.class);
service = mock(SystemManagementService.class);
- cache = mock(InternalCache.class);
- statisticsFactory = mock(StatisticsFactory.class);
statisticsClock = mock(StatisticsClock.class);
- cacheForClientAccess = mock(InternalCacheForClientAccess.class);
+ statisticsFactory = mock(StatisticsFactory.class);
+ system = mock(InternalDistributedSystem.class);
+
DistributedSystemMXBean distributedSystemMXBean = mock(DistributedSystemMXBean.class);
when(cache.getCacheForProcessingClientRequests())
@@ -77,66 +83,182 @@
@Test
public void addMemberArtifactsCreatesMonitoringRegion() throws Exception {
- InternalDistributedMember member = member(1, 20);
- FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
- cache, statisticsFactory, statisticsClock);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
federatingManager.startManager();
- federatingManager.addMemberArtifacts(member);
+ federatingManager.addMemberArtifacts(member(1, 20));
- verify(cacheForClientAccess).createInternalRegion(eq("_monitoringRegion_null<v1>20"), any(),
- any());
+ verify(cacheForClientAccess)
+ .createInternalRegion(eq("_monitoringRegion_null<v1>20"), any(), any());
}
@Test
public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() throws Exception {
- InternalDistributedMember member = member(2, 40);
- FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
- cache, statisticsFactory, statisticsClock);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
federatingManager.startManager();
- federatingManager.addMemberArtifacts(member);
+ federatingManager.addMemberArtifacts(member(2, 40));
ArgumentCaptor<InternalRegionArguments> captor =
ArgumentCaptor.forClass(InternalRegionArguments.class);
- verify(cacheForClientAccess).createInternalRegion(eq("_monitoringRegion_null<v2>40"), any(),
- captor.capture());
-
- InternalRegionArguments internalRegionArguments = captor.getValue();
- HasCachePerfStats hasCachePerfStats = internalRegionArguments.getCachePerfStatsHolder();
- assertThat(hasCachePerfStats.hasOwnStats()).isTrue();
+ verify(cacheForClientAccess)
+ .createInternalRegion(eq("_monitoringRegion_null<v2>40"), any(), captor.capture());
+ boolean hasOwnStats = captor.getValue().getCachePerfStatsHolder().hasOwnStats();
+ assertThat(hasOwnStats)
+ .isTrue();
}
@Test
public void addMemberArtifactsCreatesNotificationRegion() throws Exception {
- InternalDistributedMember member = member(3, 60);
- FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
- cache, statisticsFactory, statisticsClock);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
federatingManager.startManager();
- federatingManager.addMemberArtifacts(member);
+ federatingManager.addMemberArtifacts(member(3, 60));
- verify(cacheForClientAccess).createInternalRegion(eq("_notificationRegion_null<v3>60"), any(),
- any());
+ verify(cacheForClientAccess)
+ .createInternalRegion(eq("_notificationRegion_null<v3>60"), any(), any());
}
@Test
public void addMemberArtifactsCreatesNotificationRegionWithHasOwnStats() throws Exception {
- InternalDistributedMember member = member(4, 80);
- FederatingManager federatingManager = new FederatingManager(jmxAdapter, repo, system, service,
- cache, statisticsFactory, statisticsClock);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
federatingManager.startManager();
- federatingManager.addMemberArtifacts(member);
+ federatingManager.addMemberArtifacts(member(4, 80));
ArgumentCaptor<InternalRegionArguments> captor =
ArgumentCaptor.forClass(InternalRegionArguments.class);
- verify(cacheForClientAccess).createInternalRegion(eq("_notificationRegion_null<v4>80"), any(),
- captor.capture());
+ verify(cacheForClientAccess)
+ .createInternalRegion(eq("_notificationRegion_null<v4>80"), any(), captor.capture());
InternalRegionArguments internalRegionArguments = captor.getValue();
- HasCachePerfStats hasCachePerfStats = internalRegionArguments.getCachePerfStatsHolder();
- assertThat(hasCachePerfStats.hasOwnStats()).isTrue();
+ assertThat(internalRegionArguments.getCachePerfStatsHolder().hasOwnStats())
+ .isTrue();
+ }
+
+ @Test
+ public void removeMemberArtifactsLocallyDestroysMonitoringRegion() {
+ InternalDistributedMember member = member();
+ Region monitoringRegion = mock(Region.class);
+ when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+ .thenReturn(monitoringRegion);
+ when(repo.getEntryFromNotifRegionMap(eq(member)))
+ .thenReturn(mock(Region.class));
+ when(system.getDistributedMember())
+ .thenReturn(member);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
+
+ federatingManager.removeMemberArtifacts(member, false);
+
+ verify(monitoringRegion)
+ .localDestroyRegion();
+ }
+
+ @Test
+ public void removeMemberArtifactsLocallyDestroysNotificationRegion() {
+ InternalDistributedMember member = member();
+ Region notificationRegion = mock(Region.class);
+ when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+ .thenReturn(mock(Region.class));
+ when(repo.getEntryFromNotifRegionMap(eq(member)))
+ .thenReturn(notificationRegion);
+ when(system.getDistributedMember())
+ .thenReturn(member);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
+
+ federatingManager.removeMemberArtifacts(member, false);
+
+ verify(notificationRegion)
+ .localDestroyRegion();
+ }
+
+ @Test
+ public void removeMemberArtifactsDoesNotThrowIfMonitoringRegionIsAlreadyDestroyed() {
+ InternalDistributedMember member = member();
+ Region monitoringRegion = mock(Region.class);
+ doThrow(new RegionDestroyedException("test", "monitoringRegion"))
+ .when(monitoringRegion).localDestroyRegion();
+ when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+ .thenReturn(monitoringRegion);
+ when(repo.getEntryFromNotifRegionMap(eq(member)))
+ .thenReturn(mock(Region.class));
+ when(system.getDistributedMember())
+ .thenReturn(member);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
+
+ federatingManager.removeMemberArtifacts(member, false);
+
+ verify(monitoringRegion)
+ .localDestroyRegion();
+ }
+
+ @Test
+ public void removeMemberArtifactsDoesNotThrowIfNotificationRegionIsAlreadyDestroyed() {
+ InternalDistributedMember member = member();
+ Region notificationRegion = mock(Region.class);
+ doThrow(new RegionDestroyedException("test", "notificationRegion"))
+ .when(notificationRegion).localDestroyRegion();
+ when(repo.getEntryFromMonitoringRegionMap(eq(member)))
+ .thenReturn(mock(Region.class));
+ when(repo.getEntryFromNotifRegionMap(eq(member)))
+ .thenReturn(notificationRegion);
+ when(system.getDistributedMember())
+ .thenReturn(member);
+ FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
+ statisticsFactory, statisticsClock,
+ new MBeanProxyFactory(jmxAdapter, service),
+ new MemberMessenger(jmxAdapter, system),
+ LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ Runtime.getRuntime().availableProcessors()));
+
+ federatingManager.removeMemberArtifacts(member, false);
+
+ verify(notificationRegion)
+ .localDestroyRegion();
+ }
+
+ @Test
+ public void removeMemberArtifactsDoesNotThrowIfMBeanProxyFactoryThrowsRegionDestroyedException() {
+
+ }
+
+ private InternalDistributedMember member() {
+ return member(1, 1);
}
private InternalDistributedMember member(int viewId, int port) {