GEODE-8696: Fix synchronization in FederatingManager (#5728)
Prevent hang while protecting against removal of member artifacts
during startup of manager.
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
index 4736161..7a1d52a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java
@@ -387,7 +387,7 @@
for (DistributedMember member : otherMembers) {
Set<ObjectName> proxyNames =
- service.getFederatingManager().getProxyFactory().findAllProxies(member);
+ service.getFederatingManager().proxyFactory().findAllProxies(member);
assertThat(proxyNames).isEmpty();
ObjectName proxyMBeanName = service.getMemberMBeanName(member);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
index 555c981..b25fc7a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
@@ -114,15 +114,18 @@
public void getMemberCount() {
// 1 manager, 3 members, 1 dunit locator
managerVM.invoke(() -> {
- await()
- .untilAsserted(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
+ await().untilAsserted(() -> {
+ assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+ });
});
}
@Test
public void showJVMMetrics() {
managerVM.invoke(() -> {
- await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+ await().untilAsserted(() -> {
+ assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+ });
for (DistributedMember member : getOtherNormalMembers()) {
assertThat(distributedSystemMXBean.showJVMMetrics(member.getName())).isNotNull();
@@ -133,7 +136,9 @@
@Test
public void showOSMetrics() {
managerVM.invoke(() -> {
- await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+ await().untilAsserted(() -> {
+ assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5);
+ });
Set<InternalDistributedMember> otherMembers = getOtherNormalMembers();
for (DistributedMember member : otherMembers) {
@@ -147,15 +152,18 @@
managerVM.invoke(() -> {
distributedSystemMXBean.shutDownAllMembers();
- await().untilAsserted(() -> assertThat(getOtherNormalMembers()).hasSize(0));
+ await().untilAsserted(() -> {
+ assertThat(getOtherNormalMembers()).hasSize(0);
+ });
});
}
@Test
public void listMemberObjectNames() {
managerVM.invoke(() -> {
- await().untilAsserted(
- () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4));
+ await().untilAsserted(() -> {
+ assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4);
+ });
});
}
@@ -164,7 +172,9 @@
managerVM.invoke(() -> {
String memberName = distributedMember.getName();
- await().until(() -> distributedSystemMXBean.fetchMemberObjectName(memberName) != null);
+ await().untilAsserted(() -> {
+ assertThat(distributedSystemMXBean.fetchMemberObjectName(memberName)).isNotNull();
+ });
ObjectName memberMXBeanName = distributedSystemMXBean.fetchMemberObjectName(memberName);
assertThat(memberMXBeanName).isEqualTo(getMemberMBeanName(memberName));
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java
new file mode 100644
index 0000000..9d5678e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static java.util.Arrays.asList;
+import static javax.management.ObjectName.getInstance;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
+import static org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper.crashDistributedSystem;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.QueryExp;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.alerting.internal.spi.AlertingIOException;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedReference;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@Category(JMXTest.class)
+@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"})
+public class JmxLocatorReconnectDistributedTest implements Serializable {
+
+ private static final String LOCATOR_NAME = "locator";
+ private static final String SERVER_NAME = "server";
+ private static final String REGION_NAME = "region";
+ private static final QueryExp QUERY_ALL = null;
+
+ private static final ObjectName GEMFIRE_MXBEANS =
+ execute(() -> getInstance("GemFire:*"));
+ private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS =
+ execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME));
+ private static final Set<ObjectName> EXPECTED_LOCATOR_MXBEANS =
+ execute(() -> expectedLocatorMXBeans(LOCATOR_NAME));
+ private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS =
+ execute(() -> expectedDistributedMXBeans(REGION_NAME));
+
+ private VM locatorVM;
+ private VM serverVM;
+
+ private String locators;
+ private int locatorPort;
+ private int locatorJmxPort;
+ private Set<ObjectName> mxbeansOnServer;
+ private Set<ObjectName> mxbeansOnLocator;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule(2);
+ @Rule
+ public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+ @Rule
+ public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>();
+ @Rule
+ public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>();
+ @Rule
+ public DistributedRestoreSystemProperties restoreProps = new DistributedRestoreSystemProperties();
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Before
+ public void setUp() throws Exception {
+ locatorVM = getVM(0);
+ serverVM = getVM(1);
+
+ File locatorDir = temporaryFolder.newFolder(LOCATOR_NAME);
+ File serverDir = temporaryFolder.newFolder(SERVER_NAME);
+
+ for (VM vm : asList(locatorVM, serverVM)) {
+ vm.invoke(() -> System.setProperty(GEMFIRE_PREFIX + "standard-output-always-on", "true"));
+ }
+
+ int[] port = getRandomAvailableTCPPorts(2);
+ locatorPort = port[0];
+ locatorJmxPort = port[1];
+ locators = "localhost[" + locatorPort + "]";
+
+ locatorVM.invoke(() -> {
+ locatorLauncher.set(startLocator(locatorDir, locatorPort, locatorJmxPort, locators));
+ });
+
+ serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators)));
+
+ gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager);
+
+ String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME;
+ gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();
+
+ addIgnoredException(AlertingIOException.class);
+ addIgnoredException(CacheClosedException.class);
+ addIgnoredException(CancelException.class);
+ addIgnoredException(DistributedSystemDisconnectedException.class);
+ addIgnoredException(ForcedDisconnectException.class);
+ addIgnoredException(MemberDisconnectedException.class);
+ addIgnoredException("Possible loss of quorum");
+
+ mxbeansOnServer = serverVM.invoke(() -> {
+ await().untilAsserted(() -> {
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsAll(EXPECTED_SERVER_MXBEANS);
+ });
+ return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+ });
+
+ mxbeansOnLocator = locatorVM.invoke(() -> {
+ await().untilAsserted(() -> {
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator")
+ .containsAll(EXPECTED_SERVER_MXBEANS)
+ .containsAll(EXPECTED_LOCATOR_MXBEANS)
+ .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
+ });
+ return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+ });
+ }
+
+ @Test
+ public void serverMXBeanOnServerIsUnaffectedByLocatorCrash() {
+ locatorVM.invoke(() -> {
+ crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
+
+ await().untilAsserted(() -> {
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator")
+ .isEmpty();
+ });
+ });
+
+ serverVM.invoke(() -> {
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
+ });
+
+ locatorVM.invoke(() -> {
+ InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
+
+ await().untilAsserted(() -> {
+ boolean isReconnected = locator.isReconnected();
+ boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning();
+ Set<ObjectName> mbeanNames =
+ getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+
+ assertThat(isReconnected)
+ .as("Locator is reconnected on locator")
+ .isTrue();
+ assertThat(isSharedConfigurationRunning)
+ .as("Locator shared configuration is running on locator")
+ .isTrue();
+ assertThat(mbeanNames)
+ .as("GemFire mxbeans on locator")
+ .isEqualTo(mxbeansOnLocator);
+ });
+ });
+
+ serverVM.invoke(() -> {
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
+ });
+ }
+
+ private static LocatorLauncher startLocator(File workingDirectory, int locatorPort, int jmxPort,
+ String locators) {
+ LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+ .setMemberName(LOCATOR_NAME)
+ .setPort(locatorPort)
+ .setWorkingDirectory(workingDirectory.getAbsolutePath())
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(JMX_MANAGER, "true")
+ .set(JMX_MANAGER_PORT, String.valueOf(jmxPort))
+ .set(JMX_MANAGER_START, "true")
+ .set(LOCATORS, locators)
+ .set(LOG_FILE, new File(workingDirectory, LOCATOR_NAME + ".log").getAbsolutePath())
+ .set(MAX_WAIT_TIME_RECONNECT, "1000")
+ .set(MEMBER_TIMEOUT, "2000")
+ .build();
+
+ locatorLauncher.start();
+
+ InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
+
+ await().untilAsserted(() -> {
+ assertThat(locator.isSharedConfigurationRunning())
+ .as("Locator shared configuration is running on locator" + getVMId())
+ .isTrue();
+ });
+
+ return locatorLauncher;
+ }
+
+ private static ServerLauncher startServer(File workingDirectory, String locators) {
+ ServerLauncher serverLauncher = new ServerLauncher.Builder()
+ .setDisableDefaultServer(true)
+ .setMemberName(SERVER_NAME)
+ .setWorkingDirectory(workingDirectory.getAbsolutePath())
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(LOCATORS, locators)
+ .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath())
+ .set(MAX_WAIT_TIME_RECONNECT, "1000")
+ .set(MEMBER_TIMEOUT, "2000")
+ .build();
+
+ serverLauncher.start();
+
+ return serverLauncher;
+ }
+
+ private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName)
+ throws MalformedObjectNameException {
+ return new HashSet<>(asList(
+ getInstance("GemFire:type=Member,member=" + memberName),
+ getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName +
+ ",type=Member,member=" + memberName)));
+ }
+
+ private static Set<ObjectName> expectedLocatorMXBeans(String memberName)
+ throws MalformedObjectNameException {
+ return new HashSet<>(asList(
+ getInstance("GemFire:service=DiskStore,name=cluster_config,type=Member,member=" +
+ memberName),
+ getInstance("GemFire:service=Locator,type=Member,member=" + memberName),
+ getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Member,member=" +
+ memberName),
+ getInstance("GemFire:type=Member,member=" + memberName),
+ getInstance("GemFire:service=Manager,type=Member,member=" + memberName)));
+ }
+
+ private static Set<ObjectName> expectedDistributedMXBeans(String regionName)
+ throws MalformedObjectNameException {
+ return new HashSet<>(asList(
+ getInstance("GemFire:service=AccessControl,type=Distributed"),
+ getInstance("GemFire:service=FileUploader,type=Distributed"),
+ getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Distributed"),
+ getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"),
+ getInstance("GemFire:service=System,type=Distributed")));
+ }
+
+ private static <V> V execute(Callable<V> task) {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
similarity index 63%
rename from geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
rename to geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
index 3969f07..ab1b46c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java
@@ -32,14 +32,12 @@
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
-import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.getVMId;
import static org.apache.geode.test.dunit.VM.toArray;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.Serializable;
@@ -47,11 +45,13 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.QueryExp;
import org.junit.After;
import org.junit.Before;
@@ -73,6 +73,7 @@
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedReference;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.JMXTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;
@@ -80,61 +81,66 @@
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
@Category(JMXTest.class)
-@SuppressWarnings("serial")
-public class JMXMBeanReconnectDUnitTest implements Serializable {
+@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"})
+public class JmxServerReconnectDistributedTest implements Serializable {
- private static final long TIMEOUT_MILLIS = getTimeout().toMillis();
- private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class);
- private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class);
+ private static final String LOCATOR_1_NAME = "locator1";
+ private static final String LOCATOR_2_NAME = "locator2";
+ private static final String SERVER_NAME = "server";
+ private static final String REGION_NAME = "region";
+ private static final QueryExp QUERY_ALL = null;
+
+ private static final ObjectName GEMFIRE_MXBEANS =
+ execute(() -> getInstance("GemFire:*"));
+ private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS =
+ execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME));
+ private static final Set<ObjectName> EXPECTED_LOCATOR_1_MXBEANS =
+ execute(() -> expectedLocatorMXBeans(LOCATOR_1_NAME));
+ private static final Set<ObjectName> EXPECTED_LOCATOR_2_MXBEANS =
+ execute(() -> expectedLocatorMXBeans(LOCATOR_2_NAME));
+ private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS =
+ execute(() -> expectedDistributedMXBeans(REGION_NAME));
private static final AtomicReference<CountDownLatch> BEFORE =
new AtomicReference<>(new CountDownLatch(0));
private static final AtomicReference<CountDownLatch> AFTER =
new AtomicReference<>(new CountDownLatch(0));
- private static final AtomicReference<LocatorLauncher> LOCATOR =
- new AtomicReference<>(DUMMY_LOCATOR);
- private static final AtomicReference<ServerLauncher> SERVER =
- new AtomicReference<>(DUMMY_SERVER);
-
private VM locator1VM;
private VM locator2VM;
private VM serverVM;
- private String locator1Name;
- private String locator2Name;
- private String serverName;
private String locators;
private int locator1Port;
private int locator2Port;
private int locator1JmxPort;
private int locator2JmxPort;
- private String regionName;
private Set<ObjectName> mxbeansOnServer;
private Set<ObjectName> mxbeansOnLocator1;
private Set<ObjectName> mxbeansOnLocator2;
@Rule
- public DistributedRule distributedRule = new DistributedRule();
- @Rule
- public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+ public DistributedRule distributedRule = new DistributedRule(3);
@Rule
public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
@Rule
+ public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>();
+ @Rule
+ public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>();
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+ @Rule
public transient GfshCommandRule gfsh = new GfshCommandRule();
@Before
public void setUp() throws Exception {
- locator1VM = getVM(1);
- locator2VM = getVM(-1);
- serverVM = getVM(0);
+ locator1VM = getVM(0);
+ locator2VM = getVM(1);
+ serverVM = getVM(2);
- locator1Name = "locator1";
- locator2Name = "locator2";
- serverName = "server1";
- File locator1Dir = temporaryFolder.newFolder(locator1Name);
- File locator2Dir = temporaryFolder.newFolder(locator2Name);
- File serverDir = temporaryFolder.newFolder(serverName);
+ File locator1Dir = temporaryFolder.newFolder(LOCATOR_1_NAME);
+ File locator2Dir = temporaryFolder.newFolder(LOCATOR_2_NAME);
+ File serverDir = temporaryFolder.newFolder(SERVER_NAME);
int[] port = getRandomAvailableTCPPorts(4);
locator1Port = port[0];
@@ -144,18 +150,19 @@
locators = "localhost[" + locator1Port + "],localhost[" + locator2Port + "]";
locator1VM.invoke(() -> {
- startLocator(locator1Name, locator1Dir, locator1Port, locator1JmxPort, locators);
+ locatorLauncher.set(
+ startLocator(LOCATOR_1_NAME, locator1Dir, locator1Port, locator1JmxPort, locators));
});
locator2VM.invoke(() -> {
- startLocator(locator2Name, locator2Dir, locator2Port, locator2JmxPort, locators);
+ locatorLauncher.set(
+ startLocator(LOCATOR_2_NAME, locator2Dir, locator2Port, locator2JmxPort, locators));
});
- serverVM.invoke(() -> startServer(serverName, serverDir, locators));
+ serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators)));
gfsh.connectAndVerify(locator1JmxPort, PortType.jmxManager);
- regionName = "region1";
- String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + regionName;
+ String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME;
gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();
addIgnoredException(AlertingIOException.class);
@@ -168,37 +175,35 @@
mxbeansOnServer = serverVM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsAll(expectedServerMXBeans(serverName, regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsAll(EXPECTED_SERVER_MXBEANS);
});
- return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+ return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
});
mxbeansOnLocator1 = locator1VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
- .containsAll(expectedServerMXBeans(serverName, regionName))
- .containsAll(expectedLocatorMXBeans(locator1Name))
- .containsAll(expectedLocatorMXBeans(locator2Name))
- .containsAll(expectedDistributedMXBeans(regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
+ .containsAll(EXPECTED_SERVER_MXBEANS)
+ .containsAll(EXPECTED_LOCATOR_1_MXBEANS)
+ .containsAll(EXPECTED_LOCATOR_2_MXBEANS)
+ .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
});
-
- return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+ return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
});
mxbeansOnLocator2 = locator2VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator2")
- .containsAll(expectedServerMXBeans(serverName, regionName))
- .containsAll(expectedLocatorMXBeans(locator2Name))
- .containsAll(expectedLocatorMXBeans(locator1Name))
- .containsAll(expectedDistributedMXBeans(regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator2")
+ .containsAll(EXPECTED_SERVER_MXBEANS)
+ .containsAll(EXPECTED_LOCATOR_2_MXBEANS)
+ .containsAll(EXPECTED_LOCATOR_1_MXBEANS)
+ .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
});
-
- return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+ return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
});
}
@@ -207,19 +212,16 @@
invokeInEveryVM(() -> {
BEFORE.get().countDown();
AFTER.get().countDown();
- SERVER.getAndSet(DUMMY_SERVER).stop();
- LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
});
- disconnectAllFromDS();
}
@Test
public void serverHasMemberTypeMXBeans() {
serverVM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsAll(expectedServerMXBeans(serverName, regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsAll(EXPECTED_SERVER_MXBEANS);
});
});
}
@@ -229,9 +231,9 @@
for (VM locatorVM : toArray(locator1VM, locator2VM)) {
locatorVM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsAll(expectedServerMXBeans(serverName, regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsAll(EXPECTED_SERVER_MXBEANS);
});
});
}
@@ -241,17 +243,17 @@
public void locatorHasMemberTypeMXBeansForBothLocators() {
locator1VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
- .containsAll(expectedLocatorMXBeans(locator1Name));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
+ .containsAll(EXPECTED_LOCATOR_1_MXBEANS);
});
});
locator2VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator2")
- .containsAll(expectedLocatorMXBeans(locator2Name));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator2")
+ .containsAll(EXPECTED_LOCATOR_2_MXBEANS);
});
});
}
@@ -261,9 +263,9 @@
for (VM locatorVM : toArray(locator1VM, locator2VM)) {
locatorVM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator" + getVMId())
- .containsAll(expectedDistributedMXBeans(regionName));
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator" + getVMId())
+ .containsAll(EXPECTED_DISTRIBUTED_MXBEANS);
});
});
}
@@ -275,29 +277,30 @@
@Test
public void serverMXBeansOnServerAreUnaffectedByLocatorCrash() {
locator1VM.invoke(() -> {
- crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem());
+ crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.isEmpty();
});
});
serverVM.invoke(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsExactlyElementsOf(mxbeansOnServer);
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
});
locator1VM.invoke(() -> {
- InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
+ InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
await().untilAsserted(() -> {
boolean isReconnected = locator.isReconnected();
boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning();
Set<ObjectName> mbeanNames =
- getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null);
+ getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL);
+
assertThat(isReconnected)
.as("Locator is reconnected on locator1")
.isTrue();
@@ -305,15 +308,15 @@
.as("Locator shared configuration is running on locator1")
.isTrue();
assertThat(mbeanNames)
- .as("GemFire mbeans on locator1")
+ .as("GemFire mxbeans on locator1")
.isEqualTo(mxbeansOnLocator1);
});
});
serverVM.invoke(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsExactlyElementsOf(mxbeansOnServer);
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
});
}
@@ -323,48 +326,48 @@
@Test
public void serverMXBeansOnLocatorAreRestoredAfterCrashedServerReturns() {
serverVM.invoke(() -> {
- crashDistributedSystem(SERVER.get().getCache().getDistributedSystem());
+ crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem());
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
.isEmpty();
});
});
locator1VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.doesNotContainAnyElementsOf(mxbeansOnServer);
});
});
serverVM.invoke(() -> {
- InternalCache cache = (InternalCache) SERVER.get().getCache();
+ InternalCache cache = (InternalCache) serverLauncher.get().getCache();
InternalDistributedSystem system = cache.getInternalDistributedSystem();
await().untilAsserted(() -> {
assertThat(system.isReconnecting())
- .as("System is reconnecting on server1")
+ .as("System is reconnecting on server")
.isTrue();
});
- system.waitUntilReconnected(TIMEOUT_MILLIS, MILLISECONDS);
+ system.waitUntilReconnected(getTimeout().toMillis(), MILLISECONDS);
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
- .containsExactlyElementsOf(mxbeansOnServer);
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnServer);
});
});
locator1VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.containsAll(mxbeansOnServer)
- .containsExactlyElementsOf(mxbeansOnLocator1);
+ .containsExactlyInAnyOrderElementsOf(mxbeansOnLocator1);
});
});
}
@@ -384,31 +387,31 @@
@Override
public void reconnecting(InternalDistributedSystem oldSystem) {
try {
- BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+ BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS);
} catch (InterruptedException e) {
errorCollector.addError(e);
}
}
});
- crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem());
+ crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem());
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.isEmpty();
});
});
locator2VM.invoke(() -> {
Collection<ObjectName> locator1MXBeans = new ArrayList<>(mxbeansOnLocator1);
- locator1MXBeans.removeAll(expectedServerMXBeans(serverName, regionName));
- locator1MXBeans.removeAll(expectedLocatorMXBeans(locator2Name));
- locator1MXBeans.removeAll(expectedDistributedMXBeans(regionName));
+ locator1MXBeans.removeAll(EXPECTED_SERVER_MXBEANS);
+ locator1MXBeans.removeAll(EXPECTED_LOCATOR_2_MXBEANS);
+ locator1MXBeans.removeAll(EXPECTED_DISTRIBUTED_MXBEANS);
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator2")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator2")
.doesNotContainAnyElementsOf(locator1MXBeans);
});
});
@@ -416,14 +419,14 @@
locator1VM.invoke(() -> {
BEFORE.get().countDown();
- await().untilAsserted(() -> {
- InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
+ InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator();
+ await().untilAsserted(() -> {
assertThat(locator.isSharedConfigurationRunning())
.as("Locator shared configuration is running on locator1")
.isTrue();
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.containsAll(mxbeansOnLocator1);
});
});
@@ -445,7 +448,7 @@
@Override
public void reconnecting(InternalDistributedSystem oldSystem) {
try {
- BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+ BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS);
} catch (InterruptedException e) {
errorCollector.addError(e);
}
@@ -458,11 +461,11 @@
}
});
- crashDistributedSystem(SERVER.get().getCache().getDistributedSystem());
+ crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem());
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
.isEmpty();
});
});
@@ -470,8 +473,8 @@
for (VM locatorVM : toArray(locator1VM, locator2VM)) {
locatorVM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator" + locatorVM.getId())
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator" + locatorVM.getId())
.isNotEmpty()
.doesNotContainAnyElementsOf(mxbeansOnServer);
});
@@ -480,35 +483,35 @@
serverVM.invoke(() -> {
BEFORE.get().countDown();
- AFTER.get().await(TIMEOUT_MILLIS, MILLISECONDS);
+ AFTER.get().await(getTimeout().toMillis(), MILLISECONDS);
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on server1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on server")
.isEqualTo(mxbeansOnServer);
});
});
locator1VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator1")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator1")
.containsAll(mxbeansOnLocator1);
});
});
locator2VM.invoke(() -> {
await().untilAsserted(() -> {
- assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
- .as("GemFire mbeans on locator2")
+ assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL))
+ .as("GemFire mxbeans on locator2")
.containsAll(mxbeansOnLocator2);
});
});
}
- private static void startLocator(String name, File workingDirectory, int locatorPort, int jmxPort,
- String locators) {
- LOCATOR.set(new LocatorLauncher.Builder()
+ private static LocatorLauncher startLocator(String name, File workingDirectory, int locatorPort,
+ int jmxPort, String locators) {
+ LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
.setMemberName(name)
.setPort(locatorPort)
.setWorkingDirectory(workingDirectory.getAbsolutePath())
@@ -520,40 +523,44 @@
.set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath())
.set(MAX_WAIT_TIME_RECONNECT, "1000")
.set(MEMBER_TIMEOUT, "2000")
- .build());
+ .build();
- LOCATOR.get().start();
+ locatorLauncher.start();
+
+ InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
await().untilAsserted(() -> {
- InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator();
assertThat(locator.isSharedConfigurationRunning())
.as("Locator shared configuration is running on locator" + getVMId())
.isTrue();
});
+
+ return locatorLauncher;
}
- private static void startServer(String name, File workingDirectory, String locators) {
- SERVER.set(new ServerLauncher.Builder()
+ private static ServerLauncher startServer(File workingDirectory, String locators) {
+ ServerLauncher serverLauncher = new ServerLauncher.Builder()
.setDisableDefaultServer(true)
- .setMemberName(name)
+ .setMemberName(SERVER_NAME)
.setWorkingDirectory(workingDirectory.getAbsolutePath())
.set(HTTP_SERVICE_PORT, "0")
.set(LOCATORS, locators)
- .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath())
+ .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath())
.set(MAX_WAIT_TIME_RECONNECT, "1000")
.set(MEMBER_TIMEOUT, "2000")
- .build());
+ .build();
- SERVER.get().start();
+ serverLauncher.start();
+
+ return serverLauncher;
}
private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName)
throws MalformedObjectNameException {
return new HashSet<>(asList(
getInstance("GemFire:type=Member,member=" + memberName),
- getInstance(
- "GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Member,member=" +
- memberName)));
+ getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName +
+ ",type=Member,member=" + memberName)));
}
private static Set<ObjectName> expectedLocatorMXBeans(String memberName)
@@ -577,4 +584,12 @@
getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"),
getInstance("GemFire:service=System,type=Distributed")));
}
+
+ private static <V> V execute(Callable<V> task) {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
index 36cc35d..60faef5 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
@@ -178,7 +178,7 @@
(SystemManagementService) ManagementService.getManagementService(cache);
service.startManager();
FederatingManager federatingManager = service.getFederatingManager();
- proxyFactory = federatingManager.getProxyFactory();
+ proxyFactory = federatingManager.proxyFactory();
return locatorLauncher.getPort();
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
index 2efe96e..d9dc552 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
@@ -82,7 +82,7 @@
await().until(() -> !cache.getAllRegions().isEmpty());
- assertThat(federatingManager.getAndResetLatestException()).isNull();
+ assertThat(federatingManager.latestException()).isNull();
}
private InternalDistributedMember member() throws UnknownHostException {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
deleted file mode 100644
index db6b2dd..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.quality.Strictness.STRICT_STUBS;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.Executors;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-import org.apache.geode.StatisticsFactory;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.InternalCacheForClientAccess;
-import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.test.junit.categories.JMXTest;
-
-@Category(JMXTest.class)
-public class FederatingManagerIntegrationTest {
- @Rule
- public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
-
- @Mock
- public InternalCache cache;
- @Mock
- public InternalCacheForClientAccess cacheForClientAccess;
- @Mock
- public MBeanProxyFactory proxyFactory;
- @Mock
- public MemberMessenger messenger;
- @Mock
- public ManagementResourceRepo repo;
- @Mock
- public SystemManagementService service;
- @Mock
- public StatisticsFactory statisticsFactory;
- @Mock
- public StatisticsClock statisticsClock;
- @Mock
- public InternalDistributedSystem system;
- @Mock
- public DistributionManager distributionManager;
-
- @Before
- public void setUp() throws IOException, ClassNotFoundException {
- when(cache.getCacheForProcessingClientRequests())
- .thenReturn(cacheForClientAccess);
- when(system.getDistributionManager())
- .thenReturn(distributionManager);
- }
-
- @Test
- public void restartDoesNotThrowIfOtherMembersExist() {
- when(distributionManager.getOtherDistributionManagerIds())
- .thenReturn(Collections.singleton(mock(InternalDistributedMember.class)));
-
- FederatingManager federatingManager =
- new FederatingManager(repo, system, service, cache, statisticsFactory,
- statisticsClock, proxyFactory, messenger, Executors::newSingleThreadExecutor);
-
- federatingManager.startManager();
- federatingManager.stopManager();
-
- assertThatCode(federatingManager::startManager)
- .doesNotThrowAnyException();
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
index 1af2974..cadbe26 100644
--- a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
+++ b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java
@@ -35,8 +35,8 @@
*/
public class LoggingSession implements InternalSessionContext {
- static final boolean STANDARD_OUTPUT_ALWAYS_ON =
- Boolean.valueOf(System.getProperty(GEMFIRE_PREFIX + "standard-output-always-on", "false"));
+ private static final boolean STANDARD_OUTPUT_ALWAYS_ON =
+ Boolean.getBoolean(GEMFIRE_PREFIX + "standard-output-always-on");
private static final Logger logger = LogService.getLogger();
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index 714c9cb..3ba7c21 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -20,11 +20,13 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.management.Notification;
@@ -63,23 +65,26 @@
*
* @since GemFire 7.0
*/
-public class FederatingManager extends Manager {
+public class FederatingManager extends Manager implements ManagerMembership {
private static final Logger logger = LogService.getLogger();
- private final SystemManagementService service;
- private final AtomicReference<Exception> latestException = new AtomicReference<>();
-
- private final Supplier<ExecutorService> executorServiceSupplier;
- private final MBeanProxyFactory proxyFactory;
- private final MemberMessenger messenger;
-
/**
* This Executor uses a pool of thread to execute the member addition /removal tasks, This will
* utilize the processing powers available. Going with unbounded queue because tasks wont be
* unbounded in practical situation as number of members will be a finite set at any given point
* of time
*/
- private ExecutorService executorService;
+ private final AtomicReference<ExecutorService> executorService = new AtomicReference<>();
+ private final AtomicReference<Exception> latestException = new AtomicReference<>();
+ private final List<Runnable> pendingTasks = new CopyOnWriteArrayList<>();
+
+ private final SystemManagementService service;
+ private final Supplier<ExecutorService> executorServiceSupplier;
+ private final MBeanProxyFactory proxyFactory;
+ private final MemberMessenger messenger;
+ private final ReentrantLock lifecycleLock;
+
+ private volatile boolean starting;
@VisibleForTesting
FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
@@ -99,6 +104,7 @@
this.proxyFactory = proxyFactory;
this.messenger = messenger;
this.executorServiceSupplier = executorServiceSupplier;
+ lifecycleLock = new ReentrantLock();
}
/**
@@ -108,31 +114,69 @@
@Override
public synchronized void startManager() {
try {
- if (logger.isDebugEnabled()) {
- logger.debug("Starting the Federating Manager.... ");
+ lifecycleLock.lock();
+ try {
+ if (starting || running) {
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting the Federating Manager.... ");
+ }
+ starting = true;
+ executorService.set(executorServiceSupplier.get());
+ running = true;
+ } finally {
+ lifecycleLock.unlock();
}
- executorService = executorServiceSupplier.get();
-
- running = true;
startManagingActivity();
+
+ lifecycleLock.lock();
+ try {
+ for (Runnable task : pendingTasks) {
+ executeTask(task);
+ }
+ } finally {
+ pendingTasks.clear();
+ starting = false;
+ lifecycleLock.unlock();
+ }
+
messenger.broadcastManagerInfo();
+
} catch (Exception e) {
- running = false;
+ cleanupFailedStart();
throw new ManagementException(e);
}
}
+ private void cleanupFailedStart() {
+ lifecycleLock.lock();
+ try {
+ pendingTasks.clear();
+ running = false;
+ starting = false;
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
@Override
public synchronized void stopManager() {
- // remove hidden management regions and federatedMBeans
- if (!running) {
- return;
+ lifecycleLock.lock();
+ try {
+ // remove hidden management regions and federatedMBeans
+ if (!running) {
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping the Federating Manager.... ");
+ }
+ running = false;
+ } finally {
+ lifecycleLock.unlock();
}
- running = false;
- if (logger.isDebugEnabled()) {
- logger.debug("Stopping the Federating Manager.... ");
- }
+
stopManagingActivity();
}
@@ -141,35 +185,56 @@
return running;
}
- public MemberMessenger getMessenger() {
- return messenger;
- }
-
/**
- * This method will be invoked from MembershipListener which is registered when the member becomes
- * a Management node.
- *
- * <p>
* This method will delegate task to another thread and exit, so that it wont block the membership
* listener
*/
- void removeMember(DistributedMember member, boolean crashed) {
- executeTask(new RemoveMemberTask(member, crashed));
+ @Override
+ public void addMember(InternalDistributedMember member) {
+ lifecycleLock.lock();
+ try {
+ if (!running) {
+ return;
+ }
+ executeTask(() -> new AddMemberTask(member).call());
+ } finally {
+ lifecycleLock.unlock();
+ }
}
/**
- * This method will be invoked from MembershipListener which is registered when the member becomes
- * a Management node.
- *
- * <p>
+ * This method will delegate task to another thread and exit, so that it wont block the membership
+ * listener
+ */
+ @Override
+ public void removeMember(DistributedMember member, boolean crashed) {
+ lifecycleLock.lock();
+ try {
+ Runnable task = new RemoveMemberTask(member, crashed);
+ if (starting) {
+ pendingTasks.add(task);
+ } else if (running) {
+ executeTask(task);
+ }
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ /**
* this method will delegate task to another thread and exit, so that it wont block the membership
* listener
*/
- void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
+ @Override
+ public void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
String reason) {
service.memberSuspect((InternalDistributedMember) member, whoSuspected, reason);
}
+ public MemberMessenger getMessenger() {
+ return messenger;
+ }
+
/**
* This will return the last updated time of the proxyMBean.
*
@@ -207,30 +272,6 @@
}
/**
- * This method will be invoked whenever a member stops being a managing node. The
- * {@code ManagementException} has to be handled by the caller.
- */
- private void stopManagingActivity() {
- try {
- executorService.shutdownNow();
-
- for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
- removeMemberArtifacts(distributedMember, false);
- }
- } catch (Exception e) {
- throw new ManagementException(e);
- }
- }
-
- private synchronized void executeTask(Runnable task) {
- try {
- executorService.execute(task);
- } catch (RejectedExecutionException ignored) {
- // Ignore, we are getting shutdown
- }
- }
-
- /**
* This method will be invoked when a node transitions from managed node to managing node This
* method will block for all GIIs to be completed But each GII is given a specific time frame.
* After that the task will be marked as cancelled.
@@ -242,7 +283,7 @@
for (InternalDistributedMember member : system.getDistributionManager()
.getOtherDistributionManagerIds()) {
- giiTaskList.add(new GIITask(member));
+ giiTaskList.add(new AddMemberTask(member));
}
try {
@@ -250,7 +291,7 @@
logger.debug("Management Resource creation started : ");
}
List<Future<InternalDistributedMember>> futureTaskList =
- executorService.invokeAll(giiTaskList);
+ executorService.get().invokeAll(giiTaskList);
for (Future<InternalDistributedMember> futureTask : futureTaskList) {
try {
@@ -296,76 +337,27 @@
}
/**
- * This method will be invoked from MembershipListener which is registered when the member becomes
- * a Management node.
- *
- * <p>
- * This method will delegate task to another thread and exit, so that it wont block the membership
- * listener
+ * This method will be invoked whenever a member stops being a managing node. The
+ * {@code ManagementException} has to be handled by the caller.
*/
- @VisibleForTesting
- void addMember(InternalDistributedMember member) {
- GIITask giiTask = new GIITask(member);
- executeTask(() -> {
- try {
- giiTask.call();
- } catch (RuntimeException e) {
- logger.warn("Error federating new member {}", member.getId(), e);
- latestException.set(e);
+ private void stopManagingActivity() {
+ try {
+ executorService.get().shutdownNow();
+
+ for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
+ removeMemberArtifacts(distributedMember, false);
}
- });
- }
-
- @VisibleForTesting
- void removeMemberArtifacts(DistributedMember member, boolean crashed) {
- Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
- Region<NotificationKey, Notification> notificationRegion =
- repo.getEntryFromNotifRegionMap(member);
-
- if (monitoringRegion == null && notificationRegion == null) {
- return;
- }
-
- repo.romoveEntryFromMonitoringRegionMap(member);
- repo.removeEntryFromNotifRegionMap(member);
-
- // If cache is closed all the regions would have been destroyed implicitly
- if (!cache.isClosed()) {
- try {
- if (monitoringRegion != null) {
- proxyFactory.removeAllProxies(member, monitoringRegion);
- monitoringRegion.localDestroyRegion();
- }
- } catch (CancelException | RegionDestroyedException ignore) {
- // ignored
- }
-
- try {
- if (notificationRegion != null) {
- notificationRegion.localDestroyRegion();
- }
- } catch (CancelException | RegionDestroyedException ignore) {
- // ignored
- }
- }
-
- if (!system.getDistributedMember().equals(member)) {
- try {
- service.memberDeparted((InternalDistributedMember) member, crashed);
- } catch (CancelException | RegionDestroyedException ignore) {
- // ignored
- }
+ } catch (Exception e) {
+ throw new ManagementException(e);
}
}
- @VisibleForTesting
- public MBeanProxyFactory getProxyFactory() {
- return proxyFactory;
- }
-
- @VisibleForTesting
- synchronized Exception getAndResetLatestException() {
- return latestException.getAndSet(null);
+ private synchronized void executeTask(Runnable task) {
+ try {
+ executorService.get().execute(task);
+ } catch (RejectedExecutionException ignored) {
+ // Ignore, we are getting shutdown
+ }
}
@VisibleForTesting
@@ -503,19 +495,80 @@
}
}
+ @VisibleForTesting
+ void removeMemberArtifacts(DistributedMember member, boolean crashed) {
+ Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
+ Region<NotificationKey, Notification> notificationRegion =
+ repo.getEntryFromNotifRegionMap(member);
+
+ if (monitoringRegion == null && notificationRegion == null) {
+ return;
+ }
+
+ repo.romoveEntryFromMonitoringRegionMap(member);
+ repo.removeEntryFromNotifRegionMap(member);
+
+ // If cache is closed all the regions would have been destroyed implicitly
+ if (!cache.isClosed()) {
+ try {
+ if (monitoringRegion != null) {
+ proxyFactory.removeAllProxies(member, monitoringRegion);
+ monitoringRegion.localDestroyRegion();
+ }
+ } catch (CancelException | RegionDestroyedException ignore) {
+ // ignored
+ }
+
+ try {
+ if (notificationRegion != null) {
+ notificationRegion.localDestroyRegion();
+ }
+ } catch (CancelException | RegionDestroyedException ignore) {
+ // ignored
+ }
+ }
+
+ if (!system.getDistributedMember().equals(member)) {
+ try {
+ service.memberDeparted((InternalDistributedMember) member, crashed);
+ } catch (CancelException | RegionDestroyedException ignore) {
+ // ignored
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MBeanProxyFactory proxyFactory() {
+ return proxyFactory;
+ }
+
+ @VisibleForTesting
+ Exception latestException() {
+ return latestException.getAndSet(null);
+ }
+
+ @VisibleForTesting
+ List<Runnable> pendingTasks() {
+ return pendingTasks;
+ }
+
+ @VisibleForTesting
+ boolean isStarting() {
+ return starting;
+ }
+
/**
- * Actual task of doing the GII
+ * Actual task of adding a member.
*
* <p>
- * It will perform the GII request which might originate from TransitionListener or Membership
- * Listener.
+ * Perform the GII request which might originate from transition listener or membership listener.
*
* <p>
- * Managing Node side resources are created per member which is visible to this node:
+ * Manager resources are created per member which is visible to this node:
*
* <pre>
- * 1)Management Region : its a Replicated NO_ACK region
- * 2)Notification Region : its a Replicated Proxy NO_ACK region
+ * 1) Management Region : its a Replicated NO_ACK region
+ * 2) Notification Region : its a Replicated Proxy NO_ACK region
* </pre>
*
* <p>
@@ -528,13 +581,13 @@
*
* <p>
* This task can be cancelled from the calling thread if a timeout happens. In that case we have
- * to handle the thread interrupt
+ * to handle the thread interrupt.
*/
- private class GIITask implements Callable<InternalDistributedMember> {
+ private class AddMemberTask implements Callable<InternalDistributedMember> {
private final InternalDistributedMember member;
- private GIITask(InternalDistributedMember member) {
+ private AddMemberTask(InternalDistributedMember member) {
this.member = member;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
index b67d8f5..b28af5b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java
@@ -14,12 +14,15 @@
*/
package org.apache.geode.management.internal;
+import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
@@ -43,7 +46,6 @@
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.ManagementException;
@@ -61,13 +63,14 @@
/**
* Management Task pushes data to the admin regions
*/
- private ManagementTask managementTask;
+ private final AtomicReference<ManagementTask> managementTask = new AtomicReference<>();
/**
* This service will be responsible for executing ManagementTasks and periodically push data to
* localMonitoringRegion
*/
- private ScheduledExecutorService singleThreadFederationScheduler;
+ private final AtomicReference<ScheduledExecutorService> singleThreadFederationScheduler =
+ new AtomicReference<>();
/**
* This map holds all the components which are eligible for federation. Although filters might
@@ -103,8 +106,7 @@
if (repo.getLocalMonitoringRegion() != null) {
return;
}
- singleThreadFederationScheduler =
- LoggingExecutors.newSingleThreadScheduledExecutor("Management Task");
+ singleThreadFederationScheduler.set(newSingleThreadScheduledExecutor("Management Task"));
if (logger.isDebugEnabled()) {
logger.debug("Creating Management Region :");
@@ -165,14 +167,14 @@
}
}
- managementTask = new ManagementTask();
+ managementTask.set(new ManagementTask());
// call run to get us initialized immediately with a sync call
- managementTask.run();
+ managementTask.get().run();
// All local resources are created for the ManagementTask
// Now Management tasks can proceed.
int updateRate = system.getConfig().getJmxManagerUpdateRate();
- singleThreadFederationScheduler.scheduleAtFixedRate(managementTask, updateRate, updateRate,
- TimeUnit.MILLISECONDS);
+ singleThreadFederationScheduler.get().scheduleAtFixedRate(managementTask.get(), updateRate,
+ updateRate, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Management Region created with Name : {}",
@@ -206,8 +208,9 @@
private void shutdownTasks() {
// No need of pooledGIIExecutor as this node wont do GII again
// so better to release resources
- if (singleThreadFederationScheduler != null) {
- singleThreadFederationScheduler.shutdownNow();
+ ScheduledExecutorService executor = singleThreadFederationScheduler.get();
+ if (executor != null) {
+ executor.shutdownNow();
}
}
@@ -246,7 +249,7 @@
*/
@VisibleForTesting
public ScheduledExecutorService getFederationScheduler() {
- return singleThreadFederationScheduler;
+ return singleThreadFederationScheduler.get();
}
/**
@@ -255,7 +258,7 @@
*/
@VisibleForTesting
public void runManagementTaskAdhoc() {
- managementTask.run();
+ managementTask.get().run();
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
index 7a48465..2adcb47 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java
@@ -28,21 +28,11 @@
*
* @since GemFire 7.0
*/
-public abstract class Manager {
+public abstract class Manager implements ManagerLifecycle {
protected final InternalCacheForClientAccess cache;
/**
- * depicts whether this node is a Managing node or not
- */
- protected volatile boolean running;
-
- /**
- * depicts whether this node is a Managing node or not
- */
- protected volatile boolean stopCacheOps;
-
- /**
* This is a single window to manipulate region resources for management
*/
protected final ManagementResourceRepo repo;
@@ -56,8 +46,15 @@
protected final StatisticsClock statisticsClock;
- public Manager(ManagementResourceRepo repo, InternalDistributedSystem system,
- InternalCache cache, StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
+ /**
+ * True if this node is a Geode JMX manager.
+ */
+ protected volatile boolean running;
+
+ protected volatile boolean stopCacheOps;
+
+ Manager(ManagementResourceRepo repo, InternalDistributedSystem system, InternalCache cache,
+ StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) {
this.repo = repo;
this.cache = cache.getCacheForProcessingClientRequests();
this.system = system;
@@ -65,12 +62,6 @@
this.statisticsClock = statisticsClock;
}
- public abstract boolean isRunning();
-
- public abstract void startManager();
-
- public abstract void stopManager();
-
@VisibleForTesting
public ManagementResourceRepo getManagementResourceRepo() {
return repo;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java
new file mode 100644
index 0000000..0ae8382
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+/**
+ * Lifecycle operations for Geode JMX managers.
+ */
+public interface ManagerLifecycle {
+
+ /**
+ * Start the manager.
+ */
+ void startManager();
+
+ /**
+ * Stop the manager.
+ */
+ void stopManager();
+
+ /**
+ * Returns true if the manager is running.
+ */
+ boolean isRunning();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java
new file mode 100644
index 0000000..845ad55
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+/**
+ * Membership operations for Geode JMX managers.
+ */
+public interface ManagerMembership {
+
+ /**
+ * This method will be invoked from MembershipListener which is registered when the member becomes
+ * a Management node.
+ */
+ void addMember(InternalDistributedMember member);
+
+ /**
+ * This method will be invoked from MembershipListener which is registered when the member becomes
+ * a Management node.
+ */
+ void removeMember(DistributedMember member, boolean crashed);
+
+ /**
+ * This method will be invoked from MembershipListener which is registered when the member becomes
+ * a Management node.
+ */
+ void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
+ String reason);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
index 27588c2..10a2f75 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -283,6 +284,8 @@
}
if (mapOfMembers != null) {
+ Objects.requireNonNull(objectName);
+ Objects.requireNonNull(proxy);
mapOfMembers.put(objectName, proxy);
memberSetSize = mapOfMembers.values().size();
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
index 2de1c51..3fda782 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
@@ -14,23 +14,37 @@
*/
package org.apache.geode.management.internal;
+import static java.util.Collections.singleton;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentCaptor.forClass;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ErrorCollector;
import org.mockito.ArgumentCaptor;
import org.apache.geode.StatisticsFactory;
@@ -48,17 +62,17 @@
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.management.ManagementException;
import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category(JMXTest.class)
public class FederatingManagerTest {
private InternalCache cache;
- private InternalCacheForClientAccess cacheForClientAccess;
private ExecutorService executorService;
- private MBeanJMXAdapter jmxAdapter;
- private MBeanProxyFactory proxyFactory;
private MemberMessenger messenger;
+ private MBeanProxyFactory proxyFactory;
private ManagementResourceRepo repo;
private SystemManagementService service;
private StatisticsFactory statisticsFactory;
@@ -67,12 +81,15 @@
private InternalRegionFactory regionFactory1;
private InternalRegionFactory regionFactory2;
+ @Rule
+ public ErrorCollector errorCollector = new ErrorCollector();
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
@Before
- public void setUp() throws Exception {
+ public void setUp() {
cache = mock(InternalCache.class);
- cacheForClientAccess = mock(InternalCacheForClientAccess.class);
executorService = mock(ExecutorService.class);
- jmxAdapter = mock(MBeanJMXAdapter.class);
messenger = mock(MemberMessenger.class);
proxyFactory = mock(MBeanProxyFactory.class);
repo = mock(ManagementResourceRepo.class);
@@ -83,14 +100,19 @@
regionFactory1 = mock(InternalRegionFactory.class);
regionFactory2 = mock(InternalRegionFactory.class);
+ InternalCacheForClientAccess cacheForClientAccess = mock(InternalCacheForClientAccess.class);
DistributedSystemMXBean distributedSystemMXBean = mock(DistributedSystemMXBean.class);
+ MBeanJMXAdapter jmxAdapter = mock(MBeanJMXAdapter.class);
when(cache.getCacheForProcessingClientRequests())
.thenReturn(cacheForClientAccess);
- when(cacheForClientAccess.createInternalRegionFactory()).thenReturn(regionFactory1)
+ when(cacheForClientAccess.createInternalRegionFactory())
+ .thenReturn(regionFactory1)
.thenReturn(regionFactory2);
- when(regionFactory1.create(any())).thenReturn(mock(Region.class));
- when(regionFactory2.create(any())).thenReturn(mock(Region.class));
+ when(regionFactory1.create(any()))
+ .thenReturn(mock(Region.class));
+ when(regionFactory2.create(any()))
+ .thenReturn(mock(Region.class));
when(distributedSystemMXBean.getAlertLevel())
.thenReturn(AlertLevel.WARNING.name());
when(jmxAdapter.getDistributedSystemMXBean())
@@ -100,9 +122,10 @@
}
@Test
- public void addMemberArtifactsCreatesMonitoringRegion() throws Exception {
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ public void addMemberArtifactsCreatesMonitoringRegion() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.startManager();
federatingManager.addMemberArtifacts(member(1, 20));
@@ -111,23 +134,24 @@
}
@Test
- public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() throws Exception {
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.startManager();
federatingManager.addMemberArtifacts(member(2, 40));
- ArgumentCaptor<HasCachePerfStats> captor =
- ArgumentCaptor.forClass(HasCachePerfStats.class);
+ ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class);
verify(regionFactory1).setCachePerfStatsHolder(captor.capture());
assertThat(captor.getValue().hasOwnStats()).isTrue();
}
@Test
public void addMemberArtifactsCreatesNotificationRegion() {
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.startManager();
federatingManager.addMemberArtifacts(member(3, 60));
@@ -137,14 +161,14 @@
@Test
public void addMemberArtifactsCreatesNotificationRegionWithHasOwnStats() {
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.startManager();
federatingManager.addMemberArtifacts(member(4, 80));
- ArgumentCaptor<HasCachePerfStats> captor =
- ArgumentCaptor.forClass(HasCachePerfStats.class);
+ ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class);
verify(regionFactory2).setCachePerfStatsHolder(captor.capture());
assertThat(captor.getValue().hasOwnStats()).isTrue();
}
@@ -159,13 +183,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(monitoringRegion)
- .localDestroyRegion();
+ verify(monitoringRegion).localDestroyRegion();
}
@Test
@@ -178,13 +202,13 @@
.thenReturn(notificationRegion);
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(notificationRegion)
- .localDestroyRegion();
+ verify(notificationRegion).localDestroyRegion();
}
@Test
@@ -199,13 +223,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(monitoringRegion)
- .localDestroyRegion();
+ verify(monitoringRegion).localDestroyRegion();
}
@Test
@@ -220,13 +244,13 @@
.thenReturn(notificationRegion);
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(notificationRegion)
- .localDestroyRegion();
+ verify(notificationRegion).localDestroyRegion();
}
@Test
@@ -242,13 +266,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(proxyFactory)
- .removeAllProxies(member, monitoringRegion);
+ verify(proxyFactory).removeAllProxies(member, monitoringRegion);
}
@Test
@@ -264,13 +288,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(proxyFactory)
- .removeAllProxies(member, monitoringRegion);
+ verify(proxyFactory).removeAllProxies(member, monitoringRegion);
}
@Test
@@ -285,13 +309,13 @@
.thenReturn(notificationRegion);
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(notificationRegion)
- .localDestroyRegion();
+ verify(notificationRegion).localDestroyRegion();
}
@Test
@@ -306,13 +330,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(monitoringRegion)
- .localDestroyRegion();
+ verify(monitoringRegion).localDestroyRegion();
}
@Test
@@ -328,13 +352,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(proxyFactory)
- .removeAllProxies(member, monitoringRegion);
+ verify(proxyFactory).removeAllProxies(member, monitoringRegion);
}
@Test
@@ -349,13 +373,13 @@
.thenReturn(notificationRegion);
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(notificationRegion)
- .localDestroyRegion();
+ verify(notificationRegion).localDestroyRegion();
}
@Test
@@ -370,13 +394,13 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
- verify(monitoringRegion)
- .localDestroyRegion();
+ verify(monitoringRegion).localDestroyRegion();
}
@Test
@@ -388,13 +412,13 @@
.thenReturn(null);
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false));
- assertThat(thrown)
- .isNull();
+ assertThat(thrown).isNull();
}
@Test
@@ -406,22 +430,23 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false));
- assertThat(thrown)
- .isNull();
+ assertThat(thrown).isNull();
}
@Test
public void startManagerGetsNewExecutorServiceFromSupplier() {
- @SuppressWarnings("unchecked")
Supplier<ExecutorService> executorServiceSupplier = mock(Supplier.class);
- when(executorServiceSupplier.get()).thenReturn(mock(ExecutorService.class));
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorServiceSupplier);
+ when(executorServiceSupplier.get())
+ .thenReturn(mock(ExecutorService.class));
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorServiceSupplier);
federatingManager.startManager();
@@ -437,23 +462,171 @@
.thenReturn(mock(Region.class));
when(system.getDistributedMember())
.thenReturn(member);
- FederatingManager federatingManager = new FederatingManager(repo, system, service, cache,
- statisticsFactory, statisticsClock, proxyFactory, messenger, executorService);
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory,
+ statisticsClock, proxyFactory, messenger, executorService);
federatingManager.removeMemberArtifacts(member, false);
verifyNoMoreInteractions(proxyFactory);
}
+ @Test
+ public void removeMemberWaitsForStartManager() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ ExecutorService executorService = mock(ExecutorService.class);
+ List<Future<InternalDistributedMember>> futureTaskList = Collections.emptyList();
+
+ when(executorService.invokeAll(any())).thenAnswer(invocation -> {
+ awaitCyclicBarrier(barrier);
+ awaitCountDownLatch(latch);
+ return futureTaskList;
+ });
+
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+
+ executorServiceRule.submit(() -> {
+ federatingManager.startManager();
+ });
+
+ executorServiceRule.submit(() -> {
+ awaitCyclicBarrier(barrier);
+ federatingManager.removeMember(member(), true);
+ });
+
+ await().untilAsserted(() -> {
+ assertThat(federatingManager.pendingTasks()).hasSize(1);
+ });
+
+ latch.countDown();
+
+ await().untilAsserted(() -> {
+ assertThat(federatingManager.pendingTasks()).isEmpty();
+ });
+ }
+
+ @Test
+ public void pendingTasksIsEmptyByDefault() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+
+ assertThat(federatingManager.pendingTasks()).isEmpty();
+ }
+
+ @Test
+ public void restartDoesNotThrowIfOtherMembersExist() {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(distributionManager.getOtherDistributionManagerIds())
+ .thenReturn(singleton(mock(InternalDistributedMember.class)));
+ when(system.getDistributionManager())
+ .thenReturn(distributionManager);
+
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, Executors::newSingleThreadExecutor);
+
+ federatingManager.startManager();
+ federatingManager.stopManager();
+
+ assertThatCode(federatingManager::startManager)
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void startManagerThrowsManagementExceptionWithNestedCauseOfFailure() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+ RuntimeException exception = new RuntimeException("startManager failed");
+ doThrow(exception)
+ .when(messenger).broadcastManagerInfo();
+
+ Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+
+ assertThat(thrown)
+ .isInstanceOf(ManagementException.class)
+ .hasCause(exception);
+ }
+
+ @Test
+ public void pendingTasksIsClearIfStartManagerFails() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+ RuntimeException exception = new RuntimeException("startManager failed");
+ doThrow(exception)
+ .when(messenger).broadcastManagerInfo();
+
+ Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+ assertThat(thrown).isNotNull();
+
+ assertThat(federatingManager.pendingTasks()).isEmpty();
+ }
+
+ @Test
+ public void startingIsFalseIfStartManagerFails() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+ RuntimeException exception = new RuntimeException("startManager failed");
+ doThrow(exception)
+ .when(messenger).broadcastManagerInfo();
+
+ Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+ assertThat(thrown).isNotNull();
+
+ assertThat(federatingManager.isStarting()).isFalse();
+ }
+
+ @Test
+ public void runningIsFalseIfStartManagerFails() {
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock,
+ proxyFactory, messenger, executorService);
+ RuntimeException exception = new RuntimeException("startManager failed");
+ doThrow(exception)
+ .when(messenger).broadcastManagerInfo();
+
+ Throwable thrown = catchThrowable(() -> federatingManager.startManager());
+ assertThat(thrown).isNotNull();
+
+ assertThat(federatingManager.isRunning()).isFalse();
+ }
+
+ private void awaitCyclicBarrier(CyclicBarrier barrier) {
+ try {
+ barrier.await(getTimeout().toMillis(), MILLISECONDS);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void awaitCountDownLatch(CountDownLatch latch) {
+ try {
+ latch.await(getTimeout().toMillis(), MILLISECONDS);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ throw new RuntimeException(e);
+ }
+ }
+
private InternalDistributedMember member() {
return member(1, 1);
}
private InternalDistributedMember member(int viewId, int port) {
InternalDistributedMember member = mock(InternalDistributedMember.class);
- when(member.getInetAddress()).thenReturn(mock(InetAddress.class));
- when(member.getVmViewId()).thenReturn(viewId);
- when(member.getMembershipPort()).thenReturn(port);
+ when(member.getInetAddress())
+ .thenReturn(mock(InetAddress.class));
+ when(member.getVmViewId())
+ .thenReturn(viewId);
+ when(member.getMembershipPort())
+ .thenReturn(port);
return member;
}
}