YARN-7252. Removing queue then failing over results in exception
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 9aeaec6..7c918a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -88,4 +88,10 @@
* @return Max Cluster level App priority.
*/
Priority getMaxClusterLevelAppPriority();
+
+ /**
+ * Returns if configuration is mutable.
+ * @return if configuration is mutable
+ */
+ boolean isConfigurationMutable();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 1ceb6fb..48c289f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -170,8 +171,14 @@
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
- // Ensure queue hiearchy in the new XML file is proper.
- validateQueueHierarchy(queues, newQueues);
+ // When failing over, if using configuration store, don't validate queue
+ // hierarchy since queues can be removed without being STOPPED.
+ if (!csContext.isConfigurationMutable() ||
+ csContext.getRMContext().getHAServiceState()
+ != HAServiceProtocol.HAServiceState.STANDBY) {
+ // Ensure queue hiearchy in the new XML file is proper.
+ validateQueueHierarchy(queues, newQueues);
+ }
// Add new queues and delete OldQeueus only after validation.
updateQueues(queues, newQueues);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 355f741..0cf5e6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -38,18 +38,21 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests {@link ZKConfigurationStore}.
@@ -303,6 +306,105 @@
rm2.close();
}
+ /**
+ * When failing over, if RM1 stopped and removed a queue that RM2 has in
+ * memory, failing over to RM2 should not throw an exception.
+ * @throws Exception
+ */
+ @Test
+ public void testFailoverAfterRemoveQueue() throws Exception {
+ HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+ ResourceManager rm1 = new MockRM(conf1);
+ rm1.start();
+ rm1.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm1.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+ ResourceManager rm2 = new MockRM(conf2);
+ rm2.start();
+ assertEquals("RM should be Standby",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ UserGroupInformation user = UserGroupInformation
+ .createUserForTesting(TEST_USER, new String[0]);
+ MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+ rm1.getResourceScheduler()).getMutableConfProvider();
+ // Add root.a
+ SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+ Map<String, String> addParams = new HashMap<>();
+ addParams.put("capacity", "100");
+ QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams);
+ schedConfUpdateInfo.getAddQueueInfo().add(addInfo);
+ // Stop root.default
+ Map<String, String> stopParams = new HashMap<>();
+ stopParams.put("state", "STOPPED");
+ stopParams.put("capacity", "0");
+ QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
+ schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ confProvider.confirmPendingMutation(true);
+ assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
+ (",")).contains("a"));
+
+ // Remove root.default
+ schedConfUpdateInfo.getUpdateQueueInfo().clear();
+ schedConfUpdateInfo.getAddQueueInfo().clear();
+ schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ confProvider.confirmPendingMutation(true);
+ assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+
+ // Start RM2 and verifies it starts with updated configuration
+ rm2.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm2.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ rm1.getRMContext().getRMAdminService().getServiceStatus()
+ .getState()) {
+ Thread.sleep(100);
+ }
+ }
+ assertEquals("RM should have been fenced",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ assertEquals("a", ((MutableCSConfigurationProvider) (
+ (CapacityScheduler) rm2.getResourceScheduler())
+ .getMutableConfProvider()).getConfStore().retrieve()
+ .get("yarn.scheduler.capacity.root.queues"));
+ assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+ // Transition to standby will set RM's HA status and then reinitialize in
+ // a separate thread. Despite asserting for STANDBY state, it's
+ // possible for reinitialization to be unfinished. Wait here for it to
+ // finish, otherwise closing rm1 will close zkManager and the unfinished
+ // reinitialization will throw an exception.
+ Thread.sleep(10000);
+ rm1.close();
+ rm2.close();
+ }
+
@Override
public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore();