YARN-10674. fs2cs should generate auto-created queue deletion properties. Contributed by Qi Zhu.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java
index 084b67d..41e7ba5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java
@@ -118,6 +118,14 @@
         "pc", "percentage",
         "Converts FS queue weights to percentages",
         false),
+    DISABLE_PREEMPTION("disable preemption", "dp", "disable-preemption",
+        "Disable the preemption with nopolicy or observeonly mode. " +
+            "Preemption is enabled by default. " +
+            "nopolicy removes ProportionalCapacityPreemptionPolicy from " +
+            "the list of monitor policies, " +
+            "observeonly sets " +
+            "yarn.resourcemanager.monitor.capacity.preemption.observe_only " +
+            "to true.", true),
     HELP("help", "h", "help", "Displays the list of options", false);
 
     private final String name;
@@ -251,6 +259,12 @@
         cliParser.getOptionValue(CliOption.CONVERSION_RULES.shortSwitch);
     String outputDir =
         cliParser.getOptionValue(CliOption.OUTPUT_DIR.shortSwitch);
+    FSConfigToCSConfigConverterParams.
+        PreemptionMode preemptionMode =
+        FSConfigToCSConfigConverterParams.
+            PreemptionMode.fromString(cliParser.
+                getOptionValue(CliOption.DISABLE_PREEMPTION.shortSwitch));
+
     boolean convertPlacementRules =
         !cliParser.hasOption(
             CliOption.SKIP_PLACEMENT_RULES_CONVERSION.shortSwitch);
@@ -260,6 +274,10 @@
     checkFile(CliOption.CONVERSION_RULES, conversionRulesFile);
     checkDirectory(CliOption.OUTPUT_DIR, outputDir);
     checkOutputDirDoesNotContainXmls(yarnSiteXmlFile, outputDir);
+    if (cliParser.hasOption(CliOption.
+        DISABLE_PREEMPTION.shortSwitch)) {
+      checkDisablePreemption(preemptionMode);
+    }
 
     // check mapping-rules.json if we intend to generate it
     if (!cliParser.hasOption(CliOption.CONSOLE_MODE.shortSwitch) &&
@@ -281,6 +299,7 @@
             cliParser.hasOption(CliOption.RULES_TO_FILE.shortSwitch))
         .withUsePercentages(
             cliParser.hasOption(CliOption.CONVERT_PERCENTAGES.shortSwitch))
+        .withDisablePreemption(preemptionMode)
         .build();
   }
 
@@ -383,6 +402,16 @@
     }
   }
 
+  private static void checkDisablePreemption(FSConfigToCSConfigConverterParams.
+      PreemptionMode preemptionMode) {
+    if (preemptionMode == FSConfigToCSConfigConverterParams.
+        PreemptionMode.ENABLED) {
+      throw new PreconditionException(
+          "Specified disable-preemption mode is illegal, " +
+              " use nopolicy or observeonly.");
+    }
+  }
+
   private FSConfigToCSConfigConverter getConverter() {
     return new FSConfigToCSConfigConverter(ruleHandler, conversionOptions);
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java
index a760234..d5d4d90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java
@@ -103,6 +103,8 @@
   private String outputDirectory;
   private boolean rulesToFile;
   private boolean usePercentages;
+  private FSConfigToCSConfigConverterParams.
+      PreemptionMode preemptionMode;
 
   public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
       ruleHandler, ConversionOptions conversionOptions) {
@@ -121,6 +123,7 @@
     this.outputDirectory = params.getOutputDirectory();
     this.rulesToFile = params.isPlacementRulesToFile();
     this.usePercentages = params.isUsePercentages();
+    this.preemptionMode = params.getPreemptionMode();
     prepareOutputFiles(params.isConsole());
     loadConversionRules(params.getConversionRulesConfig());
     Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
@@ -277,7 +280,8 @@
         new FSYarnSiteConverter();
     siteConverter.convertSiteProperties(inputYarnSiteConfig,
         convertedYarnSiteConfig, drfUsed,
-        conversionOptions.isEnableAsyncScheduler());
+        conversionOptions.isEnableAsyncScheduler(),
+        usePercentages, preemptionMode);
 
     preemptionEnabled = siteConverter.isPreemptionEnabled();
     sizeBasedWeight = siteConverter.isSizeBasedWeight();
@@ -291,6 +295,7 @@
     emitDefaultUserMaxParallelApplications();
     emitUserMaxParallelApplications();
     emitDefaultMaxAMShare();
+    emitDisablePreemptionForObserveOnlyMode();
 
     FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
         .withRuleHandler(ruleHandler)
@@ -407,6 +412,14 @@
           queueMaxAMShareDefault);
     }
   }
+  private void emitDisablePreemptionForObserveOnlyMode() {
+    if (preemptionMode == FSConfigToCSConfigConverterParams
+            .PreemptionMode.OBSERVE_ONLY) {
+      capacitySchedulerConfig.
+          setBoolean(CapacitySchedulerConfiguration.
+              PREEMPTION_OBSERVE_ONLY, true);
+    }
+  }
 
   private void emitACLs(FairScheduler fs) {
     fs.getAllocationConfiguration().getQueueAcls()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverterParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverterParams.java
index 1f51530..bc6adfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverterParams.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverterParams.java
@@ -30,6 +30,35 @@
   private boolean convertPlacementRules;
   private boolean placementRulesToFile;
   private boolean usePercentages;
+  private PreemptionMode preemptionMode;
+
+  public enum PreemptionMode {
+    ENABLED("enabled"),
+    NO_POLICY("nopolicy"),
+    OBSERVE_ONLY("observeonly");
+
+    private String cliOption;
+
+    PreemptionMode(String cliOption) {
+      this.cliOption = cliOption;
+    }
+
+    public String getCliOption() {
+      return cliOption;
+    }
+
+    public static PreemptionMode fromString(String cliOption) {
+      if (cliOption != null && cliOption.trim().
+          equals(PreemptionMode.OBSERVE_ONLY.getCliOption())) {
+        return PreemptionMode.OBSERVE_ONLY;
+      } else if (cliOption != null && cliOption.trim().
+          equals(PreemptionMode.NO_POLICY.getCliOption())) {
+        return PreemptionMode.NO_POLICY;
+      } else {
+        return PreemptionMode.ENABLED;
+      }
+    }
+  }
 
   private FSConfigToCSConfigConverterParams() {
     //must use builder
@@ -71,6 +100,10 @@
     return usePercentages;
   }
 
+  public PreemptionMode getPreemptionMode() {
+    return preemptionMode;
+  }
+
   @Override
   public String toString() {
     return "FSConfigToCSConfigConverterParams{" +
@@ -99,6 +132,7 @@
     private boolean convertPlacementRules;
     private boolean placementRulesToFile;
     private boolean usePercentages;
+    private PreemptionMode preemptionMode;
 
     private Builder() {
     }
@@ -152,6 +186,11 @@
       return this;
     }
 
+    public Builder withDisablePreemption(PreemptionMode preemptionMode) {
+      this.preemptionMode = preemptionMode;
+      return this;
+    }
+
     public FSConfigToCSConfigConverterParams build() {
       FSConfigToCSConfigConverterParams params =
           new FSConfigToCSConfigConverterParams();
@@ -164,6 +203,7 @@
       params.convertPlacementRules = this.convertPlacementRules;
       params.placementRulesToFile = this.placementRulesToFile;
       params.usePercentages = this.usePercentages;
+      params.preemptionMode = this.preemptionMode;
       return params;
     }
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java
index 4222e3a..401c056c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java
@@ -20,6 +20,8 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
@@ -36,7 +38,9 @@
 
   @SuppressWarnings({"deprecation", "checkstyle:linelength"})
   public void convertSiteProperties(Configuration conf,
-      Configuration yarnSiteConfig, boolean drfUsed, boolean enableAsyncScheduler) {
+      Configuration yarnSiteConfig, boolean drfUsed,
+      boolean enableAsyncScheduler, boolean userPercentage,
+      FSConfigToCSConfigConverterParams.PreemptionMode preemptionMode) {
     yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
         CapacityScheduler.class.getCanonicalName());
 
@@ -52,12 +56,20 @@
           "schedule-asynchronously.scheduling-interval-ms", interval);
     }
 
+    // This should be always true to trigger cs auto
+    // refresh queue.
+    yarnSiteConfig.setBoolean(
+        YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+
     if (conf.getBoolean(FairSchedulerConfiguration.PREEMPTION,
         FairSchedulerConfiguration.DEFAULT_PREEMPTION)) {
-      yarnSiteConfig.setBoolean(
-          YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
       preemptionEnabled = true;
 
+      String policies = addMonitorPolicy(ProportionalCapacityPreemptionPolicy.
+          class.getCanonicalName(), yarnSiteConfig);
+      yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+          policies);
+
       int waitTimeBeforeKill = conf.getInt(
           FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL,
           FairSchedulerConfiguration.DEFAULT_WAIT_TIME_BEFORE_KILL);
@@ -71,6 +83,23 @@
       yarnSiteConfig.setLong(
           CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
           waitBeforeNextStarvationCheck);
+    } else {
+      if (preemptionMode ==
+          FSConfigToCSConfigConverterParams.PreemptionMode.NO_POLICY) {
+        yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, "");
+      }
+    }
+
+    // For auto created queue's auto deletion.
+    if (!userPercentage) {
+      String policies = addMonitorPolicy(AutoCreatedQueueDeletionPolicy.
+          class.getCanonicalName(), yarnSiteConfig);
+      yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+          policies);
+
+      // Set the expired for deletion interval to 10s, consistent with fs.
+      yarnSiteConfig.setInt(CapacitySchedulerConfiguration.
+          AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, 10);
     }
 
     if (conf.getBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE,
@@ -132,4 +161,17 @@
   public boolean isSizeBasedWeight() {
     return sizeBasedWeight;
   }
+
+  private String addMonitorPolicy(String policyName,
+      Configuration yarnSiteConfig) {
+    String policies =
+        yarnSiteConfig.get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
+    if (policies == null || policies.isEmpty()) {
+      policies = policyName;
+    } else {
+      policies += "," + policyName;
+    }
+    return policies;
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java
index 5450d40..c1c774b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java
@@ -687,6 +687,21 @@
             schedulingEnabledValue);
   }
 
+  @Test
+  public void testSiteDisabledPreemptionWithObserveOnlyConversion()
+      throws Exception{
+    FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
+        .withDisablePreemption(FSConfigToCSConfigConverterParams.
+            PreemptionMode.OBSERVE_ONLY)
+        .build();
+
+    converter.convert(params);
+    assertTrue("The observe only should be true",
+        converter.getCapacitySchedulerConfig().
+            getBoolean(CapacitySchedulerConfiguration.
+                PREEMPTION_OBSERVE_ONLY, false));
+  }
+
   private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
     FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
             .withClusterResource(CLUSTER_RESOURCE_STRING)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java
index 9cebf16..55ac242 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java
@@ -18,6 +18,8 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -28,6 +30,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * Unit tests for FSYarnSiteConverter.
@@ -37,6 +40,8 @@
   private Configuration yarnConfig;
   private FSYarnSiteConverter converter;
   private Configuration yarnConvertedConfig;
+  private static final String DELETION_POLICY_CLASS =
+      AutoCreatedQueueDeletionPolicy.class.getCanonicalName();
 
   @Before
   public void setup() {
@@ -54,7 +59,7 @@
         FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
 
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
@@ -73,7 +78,7 @@
           321);
 
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertTrue("Preemption enabled",
         yarnConvertedConfig.getBoolean(
@@ -87,6 +92,41 @@
         yarnConvertedConfig.getInt(
             CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
               -1));
+
+    assertFalse("Observe_only should be false",
+        yarnConvertedConfig.getBoolean(CapacitySchedulerConfiguration.
+                PREEMPTION_OBSERVE_ONLY, false));
+
+    assertTrue("Should contain ProportionalCapacityPreemptionPolicy.",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
+        contains(ProportionalCapacityPreemptionPolicy.
+        class.getCanonicalName()));
+  }
+
+  @Test
+  public void testSiteDisabledPreemptionWithNoPolicyConversion() {
+    // Default mode is nopolicy
+    yarnConfig.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, false,  null);
+
+    assertFalse("Should not contain ProportionalCapacityPreemptionPolicy.",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
+            contains(ProportionalCapacityPreemptionPolicy.
+                class.getCanonicalName()));
+
+    yarnConfig.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, false,
+        FSConfigToCSConfigConverterParams.PreemptionMode.NO_POLICY);
+
+    assertFalse("Should not contain ProportionalCapacityPreemptionPolicy.",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
+            contains(ProportionalCapacityPreemptionPolicy.
+                class.getCanonicalName()));
   }
 
   @Test
@@ -94,7 +134,7 @@
     yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
 
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertTrue("Assign multiple",
         yarnConvertedConfig.getBoolean(
@@ -107,7 +147,7 @@
     yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
 
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertEquals("Max assign", 111,
         yarnConvertedConfig.getInt(
@@ -122,7 +162,7 @@
         "321.321");
 
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertEquals("Locality threshold node", "123.123",
         yarnConvertedConfig.get(
@@ -135,7 +175,7 @@
   @Test
   public void testSiteDrfEnabledConversion() {
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
-      false);
+        false, false, null);
 
     assertEquals("Resource calculator type", DominantResourceCalculator.class,
         yarnConvertedConfig.getClass(
@@ -145,7 +185,7 @@
   @Test
   public void testSiteDrfDisabledConversion() {
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-      false);
+        false, false, null);
 
     assertEquals("Resource calculator type", DefaultResourceCalculator.class,
         yarnConvertedConfig.getClass(
@@ -156,7 +196,7 @@
   @Test
   public void testAsyncSchedulingEnabledConversion() {
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
-            true);
+            true, false, null);
 
     assertTrue("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
                     CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
@@ -166,10 +206,79 @@
   @Test
   public void testAsyncSchedulingDisabledConversion() {
     converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
-            false);
+            false, false, null);
 
     assertFalse("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
             CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
             CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
   }
+
+  @Test
+  public void testSiteQueueAutoDeletionConversionWithWeightMode() {
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, false, null);
+    assertTrue(yarnConvertedConfig.get(YarnConfiguration.
+        RM_SCHEDULER_ENABLE_MONITORS), true);
+    assertTrue("Scheduling Policies contain auto deletion policy",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
+            .contains(DELETION_POLICY_CLASS));
+
+    // Test when policy has existed.
+    yarnConvertedConfig.
+        set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        "testPolicy");
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, false, null);
+    assertTrue("Scheduling Policies contain auto deletion policy",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
+            .contains(DELETION_POLICY_CLASS));
+
+    assertEquals("Auto deletion policy expired time should be 10s",
+        10, yarnConvertedConfig.
+            getLong(CapacitySchedulerConfiguration.
+                    AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
+                CapacitySchedulerConfiguration.
+                    DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME));
+  }
+
+  @Test
+  public void
+      testSiteQueueAutoDeletionConversionDisabledForPercentageMode() {
+
+    // test percentage mode
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, true, null);
+    assertTrue(yarnConvertedConfig.get(YarnConfiguration.
+        RM_SCHEDULER_ENABLE_MONITORS), true);
+
+    assertTrue("Scheduling Policies should not" +
+            "contain auto deletion policy in percentage mode",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES) == null ||
+            !yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
+                contains(DELETION_POLICY_CLASS));
+
+    yarnConvertedConfig.
+        set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+            "testPolicy");
+    converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
+        false, true, null);
+    assertFalse("Scheduling Policies should not " +
+            "contain auto deletion policy in percentage mode",
+        yarnConvertedConfig.
+            get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
+            .contains(DELETION_POLICY_CLASS));
+
+    assertNotEquals("Auto deletion policy expired time should not " +
+            "be set in percentage mode",
+        10, yarnConvertedConfig.
+            getLong(CapacitySchedulerConfiguration.
+                    AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
+                CapacitySchedulerConfiguration.
+                    DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME));
+
+  }
 }