/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

public class TestNodeLabelContainerAllocation {
  private final int GB = 1024;

  private YarnConfiguration conf;
  
  RMNodeLabelsManager mgr;

  @Before
  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
      ResourceScheduler.class);
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
  }
  
  private Configuration getConfigurationWithQueueLabels(Configuration config) {
    CapacitySchedulerConfiguration conf =
        new CapacitySchedulerConfiguration(config);
    
    // Define top-level queues
    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setAccessibleNodeLabels(A, toSet("x"));
    conf.setCapacityByLabel(A, "x", 100);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    conf.setCapacity(B, 20);
    conf.setAccessibleNodeLabels(B, toSet("y", "z"));
    conf.setCapacityByLabel(B, "y", 100);
    conf.setCapacityByLabel(B, "z", 100);

    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
    conf.setCapacity(C, 70);
    conf.setMaximumCapacity(C, 70);
    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
    
    // Define 2nd-level queues
    final String A1 = A + ".a1";
    conf.setQueues(A, new String[] {"a1"});
    conf.setCapacity(A1, 100);
    conf.setMaximumCapacity(A1, 100);
    conf.setCapacityByLabel(A1, "x", 100);
    
    final String B1 = B + ".b1";
    conf.setQueues(B, new String[] {"b1"});
    conf.setCapacity(B1, 100);
    conf.setMaximumCapacity(B1, 100);
    conf.setCapacityByLabel(B1, "y", 100);
    conf.setCapacityByLabel(B1, "z", 100);

    final String C1 = C + ".c1";
    conf.setQueues(C, new String[] {"c1"});
    conf.setCapacity(C1, 100);
    conf.setMaximumCapacity(C1, 100);
    
    return conf;
  }
  
  private void checkTaskContainersHost(ApplicationAttemptId attemptId,
      ContainerId containerId, ResourceManager rm, String host) {
    YarnScheduler scheduler = rm.getRMContext().getScheduler();
    SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);

    Assert.assertTrue(appReport.getLiveContainers().size() > 0);
    for (RMContainer c : appReport.getLiveContainers()) {
      if (c.getContainerId().equals(containerId)) {
        Assert.assertEquals(host, c.getAllocatedNode().getHost());
      }
    }
  }
  
  @SuppressWarnings("unchecked")
  private <E> Set<E> toSet(E... elements) {
    Set<E> set = Sets.newHashSet(elements);
    return set;
  }
  
  
  @Test (timeout = 300000)
  public void testContainerAllocationWithSingleUserLimits() throws Exception {
    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);

    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
    rm1.registerNode("h2:1234", 8000); // label = y
    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    
    // A has only 10% of x, so it can only allocate one container in label=empty
    ContainerId containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
    Assert.assertTrue(rm1.waitForState(nm3, containerId,
          RMContainerState.ALLOCATED));
    // Cannot allocate 2nd label=empty container
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
    Assert.assertFalse(rm1.waitForState(nm3, containerId,
          RMContainerState.ALLOCATED));

    // A has default user limit = 100, so it can use all resource in label = x
    // We can allocate floor(8000 / 1024) = 7 containers
    for (int id = 3; id <= 8; id++) {
      containerId =
          ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
      am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
      Assert.assertTrue(rm1.waitForState(nm1, containerId,
          RMContainerState.ALLOCATED));
    }
    rm1.close();
  }
  
  @Test(timeout = 300000)
  public void testContainerAllocateWithComplexLabels() throws Exception {
    /*
     * Queue structure:
     *                      root (*)
     *                  ________________
     *                 /                \
     *               a x(100%), y(50%)   b y(50%), z(100%)
     *               ________________    ______________
     *              /                   /              \
     *             a1 (x,y)         b1(no)              b2(y,z)
     *               100%                          y = 100%, z = 100%
     *                           
     * Node structure:
     * h1 : x
     * h2 : y
     * h3 : y
     * h4 : z
     * h5 : NO
     * 
     * Total resource:
     * x: 4G
     * y: 6G
     * z: 2G
     * *: 2G
     * 
     * Resource of
     * a1: x=4G, y=3G, NO=0.2G
     * b1: NO=0.9G (max=1G)
     * b2: y=3, z=2G, NO=0.9G (max=1G)
     * 
     * Each node can only allocate two containers
     */

    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
        NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
        toSet("z"), NodeId.newInstance("h5", 0),
        RMNodeLabelsManager.EMPTY_STRING_SET));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 2048);
    MockNM nm2 = rm1.registerNode("h2:1234", 2048);
    MockNM nm3 = rm1.registerNode("h3:1234", 2048);
    MockNM nm4 = rm1.registerNode("h4:1234", 2048);
    MockNM nm5 = rm1.registerNode("h5:1234", 2048);
    
    ContainerId containerId;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // request a container (label = y). can be allocated on nm2 
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
    Assert.assertTrue(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
        "h2");

    // launch an app to queue b1 (label = y), and check all container will
    // be allocated in h5
    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);

    // request a container for AM, will succeed
    // and now b1's queue capacity will be used, cannot allocate more containers
    // (Maximum capacity reached)
    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm4, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertFalse(rm1.waitForState(nm5, containerId,
        RMContainerState.ALLOCATED));
    
    // launch an app to queue b2
    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);

    // request a container. try to allocate on nm1 (label = x) and nm3 (label =
    // y,z). Will successfully allocate on nm3
    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
        "h3");
    
    // try to allocate container (request label = z) on nm4 (label = y,z). 
    // Will successfully allocate on nm4 only.
    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
    Assert.assertTrue(rm1.waitForState(nm4, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
        "h4");

    rm1.close();
  }

  @Test (timeout = 120000)
  public void testContainerAllocateWithLabels() throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
    
    ContainerId containerId;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);

    // request a container.
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
        "h1");

    // launch an app to queue b1 (label = y), and check all container will
    // be allocated in h2
    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);

    // request a container.
    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
        "h2");
    
    // launch an app to queue c1 (label = ""), and check all container will
    // be allocated in h3
    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);

    // request a container.
    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
        "h3");

    rm1.close();
  }
  
  @Test (timeout = 120000)
  public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
    // This test is pretty much similar to testContainerAllocateWithLabel.
    // Difference is, this test doesn't specify label expression in ResourceRequest,
    // instead, it uses default queue label expression

    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
    
    ContainerId containerId;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // request a container.
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
        "h1");

    // launch an app to queue b1 (label = y), and check all container will
    // be allocated in h2
    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

    // request a container.
    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
        "h2");
    
    // launch an app to queue c1 (label = ""), and check all container will
    // be allocated in h3
    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);

    // request a container.
    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
    Assert.assertFalse(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    Assert.assertTrue(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
        "h3");

    rm1.close();
  }

  @Test (timeout = 120000)
  public void testContainerReservationWithLabels() throws Exception {
    // This test is pretty much similar to testContainerAllocateWithLabel.
    // Difference is, this test doesn't specify label expression in
    // ResourceRequest,
    // instead, it uses default queue label expression

    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
        "z"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
        NodeId.newInstance("h3", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(
        TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
    rm1.registerNode("h2:1234", 8 * GB); // label = y
    rm1.registerNode("h3:1234", 8 * GB); // label = x

    ContainerId containerId;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // request a container.
    am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");

    // Do node heartbeats 2 times
    // First time will allocate container for app1, second time will reserve
    // container for app1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
        "h1");

    // Check if a 4G container allocated for app1, and 4G is reserved
    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
        .getApplicationAttemptId());
    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
    Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0);
    Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed("x").getMemorySize());
    Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved("x").getMemorySize());
    Assert.assertEquals(4 * GB,
        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());

    // Cancel asks of app2 and re-kick RM
    am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed("x").getMemorySize());
    Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
        .getReserved("x").getMemorySize());
    Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x")
        .getMemorySize());
    rm1.close();
  }

  @Test (timeout = 120000)
  public void testRMContainerLeakInLeafQueue() throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 =
        new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
          @Override public RMNodeLabelsManager createNodeLabelManager() {
            return mgr;
          }
        };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
    rm1.registerNode("h2:1234", 8 * GB); // label = x

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1");
    MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // request a container.
    am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");

    // Do node heartbeats 1 time
    // scheduler will reserve a container for app1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    // Check if a 4G container allocated for app1, and 4G is reserved
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
    Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());

    // kill app2 then do node heartbeat 1 time
    // scheduler will allocate a container from the reserved container on nm1
    rm1.killApp(app2.getApplicationId());
    rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
    Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());

    // After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should
    // be clean, otherwise resource leak happened
    rm1.killApp(app1.getApplicationId());
    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
    Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size());

    rm1.close();
  }

  private void checkPendingResource(MockRM rm, int priority,
      ApplicationAttemptId attemptId, int memory) {
    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
    FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
    PendingAsk ask =
        app.getAppSchedulingInfo().getPendingAsk(
            TestUtils.toSchedulerKey(priority), "*");
    Assert.assertEquals(memory,
        ask.getPerAllocationResource().getMemorySize() * ask
            .getCount());
  }
  
  private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
      int numContainers) {
    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
    SchedulerNode node = cs.getSchedulerNode(nodeId);
    Assert.assertEquals(numContainers, node.getNumContainers());
  }

  /**
   * JIRA YARN-4140, In Resource request set node label will be set only on ANY
   * reqest. RACK/NODE local and default requests label expression need to be
   * updated. This testcase is to verify the label expression is getting changed
   * based on ANY requests.
   *
   * @throws Exception
   */
  @Test
  public void testResourceRequestUpdateNodePartitions() throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(NodeLabel.newInstance("x"),
        NodeLabel.newInstance("y", false), NodeLabel.newInstance("z", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
    // inject node label manager
    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y
    // launch an app to queue b1 (label = y), AM container should be launched in
    // nm2
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
    // Creating request set when request before ANY is not having label and any
    // is having label
    List<ResourceRequest> resourceRequest = new ArrayList<ResourceRequest>();
    resourceRequest.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
        RMNodeLabelsManager.NO_LABEL));
    resourceRequest.add(am1.createResourceReq("*", 1024, 3, 5, "y"));
    resourceRequest.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
        RMNodeLabelsManager.NO_LABEL));
    resourceRequest.add(am1.createResourceReq("*", 1024, 2, 3, "y"));
    resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
    resourceRequest.add(am1.createResourceReq("*", 1024, 4, 3, null));
    resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 4, 4, null));
    am1.allocate(resourceRequest, new ArrayList<ContainerId>());
    CapacityScheduler cs =
        (CapacityScheduler) rm1.getRMContext().getScheduler();
    FiCaSchedulerApp app =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "y");
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4,
        RMNodeLabelsManager.NO_LABEL);

    // Previous any request was Y trying to update with z and the
    // request before ANY label is null
    List<ResourceRequest> newReq = new ArrayList<ResourceRequest>();
    newReq.add(am1.createResourceReq("h2:1234", 1024, 3, 4, null));
    newReq.add(am1.createResourceReq("*", 1024, 3, 5, "z"));
    newReq.add(am1.createResourceReq("h1:1234", 1024, 3, 4, null));
    newReq.add(am1.createResourceReq("*", 1024, 4, 5, "z"));
    am1.allocate(newReq, new ArrayList<ContainerId>());

    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "z");
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4, "z");
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");

    // Request before ANY and ANY request is set as NULL. Request should be set
    // with Empty Label
    List<ResourceRequest> resourceRequest1 = new ArrayList<ResourceRequest>();
    resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
        null));
    resourceRequest1.add(am1.createResourceReq("*", 1024, 3, 5, null));
    resourceRequest1.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
        RMNodeLabelsManager.NO_LABEL));
    resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 2, 1,
        null));
    resourceRequest1.add(am1.createResourceReq("*", 1024, 2, 3,
        RMNodeLabelsManager.NO_LABEL));
    resourceRequest1.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
    am1.allocate(resourceRequest1, new ArrayList<ContainerId>());

    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3,
        RMNodeLabelsManager.NO_LABEL);
    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2,
        RMNodeLabelsManager.NO_LABEL);
  }

  private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info,
      int priority, String expectedPartition) {
    for (SchedulerRequestKey key : info.getSchedulerKeys()) {
      if (key.getPriority().getPriority() == priority) {
        Assert.assertEquals("Expected partition is " + expectedPartition,
            expectedPartition,
            info.getAppPlacementAllocator(key)
                .getPrimaryRequestedNodePartition());
      }
    }
  }

  @Test
  public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
    /**
     * Test case: Submit two application to a queue (app1 first then app2), app1
     * asked for no-label, app2 asked for label=x, when node1 has label=x
     * doing heart beat, app2 will get allocation first, even if app2 submits later
     * than app1
     */
    
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x"), NodeLabel.newInstance("y", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>

    // launch an app to queue b1 (label = y), AM container should be launched in nm2
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
    
    // launch another app to queue b1 (label = y), AM container should be launched in nm2
    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

    // request container and nm1 do heartbeat (nm2 has label=y), note that app1
    // request non-labeled container, and app2 request labeled container, app2
    // will get allocated first even if app1 submitted first.  
    am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
    am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
    
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
    
    // Do node heartbeats many times
    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
    }
    
    // App2 will get preference to be allocated on node1, and node1 will be all
    // used by App2.
    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
    // app1 get nothing in nm1 (partition=y)
    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
    checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
    // app2 get all resource in nm1 (partition=y)
    checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
    checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
    
    rm1.close();
  }
  
  private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
      NodeId nodeId, FiCaSchedulerApp app) {
    int num = 0;
    for (RMContainer container : app.getLiveContainers()) {
      if (container.getAllocatedNode().equals(nodeId)) {
        num++;
      }
    }
    Assert.assertEquals(expectedNum, num);
  }
  
  @Test
  public void
      testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
          throws Exception {
    /**
     * Test case: Submit one application, it asks label="" in priority=1 and
     * label="x" in priority=2, when a node with label=x heartbeat, priority=2
     * will get allocation first even if there're pending resource in priority=1
     */
    
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x"), NodeLabel.newInstance("y", false)));
    // Makes y to be non-exclusive node labels
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>

    // launch an app to queue b1 (label = y), AM container should be launched in nm3
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
    
    // request containers from am2, priority=1 asks for "" and priority=2 asks
    // for "y", "y" container should be allocated first
    am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
    am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");

    // Do a node heartbeat once
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    cs.handle(new NodeUpdateSchedulerEvent(
        rm1.getRMContext().getRMNodes().get(nm1.getNodeId())));
    
    // Check pending resource for am2, priority=1 doesn't get allocated before
    // priority=2 allocated
    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
    checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
    
    rm1.close();
  }
  
  @Test
  public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
      throws Exception {
    /**
     * Test case: Submit one application, it asks 6 label="" containers, NM1
     * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
     * if NM1 has idle resource, containers are all allocated to NM2 since
     * non-labeled request should get allocation on non-labeled nodes first.
     */
    
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
    
    ContainerId nextContainerId;

    // launch an app to queue b1 (label = y), AM container should be launched in nm3
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
    
    // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
    // nm2/nm3 do
    // heartbeat at the same time, check containers are always allocated to nm3.
    // This is to verify when there's resource available in non-labeled
    // partition, non-labeled resource should allocate to non-labeled partition
    // first.
    am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
    for (int i = 2; i < 2 + 6; i++) {
      nextContainerId =
          ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
      Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
          nextContainerId, RMContainerState.ALLOCATED));
    }
    // no more container allocated on nm1
    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
    // all 7 (1 AM container + 6 task container) containers allocated on nm2
    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);   
    
    rm1.close();
  }

  @Test
  public void testPreferenceOfQueuesTowardsNodePartitions()
      throws Exception {
    /**
     * Test case: have a following queue structure:
     * 
     * <pre>
     *            root
     *         /   |   \
     *        a     b    c
     *       / \   / \  /  \
     *      a1 a2 b1 b2 c1 c2
     *     (x)    (x)   (x)
     * </pre>
     * 
     * Only a1, b1, c1 can access label=x, and their default label=x Each each
     * has one application, asks for 5 containers. NM1 has label=x
     * 
     * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
     * containers with label=x
     */
    
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);
    
    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 33);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 33);
    csConf.setQueues(A, new String[] {"a1", "a2"});
    
    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 33);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 33);
    csConf.setQueues(B, new String[] {"b1", "b2"});
    
    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
    csConf.setCapacity(C, 34);
    csConf.setAccessibleNodeLabels(C, toSet("x"));
    csConf.setCapacityByLabel(C, "x", 34);
    csConf.setQueues(C, new String[] {"c1", "c2"});
    
    // Define 2nd-level queues
    final String A1 = A + ".a1";
    csConf.setCapacity(A1, 50);
    csConf.setCapacityByLabel(A1, "x", 100);
    csConf.setDefaultNodeLabelExpression(A1, "x");
    
    final String A2 = A + ".a2";
    csConf.setCapacity(A2, 50);
    csConf.setCapacityByLabel(A2, "x", 0);
    
    final String B1 = B + ".b1";
    csConf.setCapacity(B1, 50);
    csConf.setCapacityByLabel(B1, "x", 100);
    csConf.setDefaultNodeLabelExpression(B1, "x");
    
    final String B2 = B + ".b2";
    csConf.setCapacity(B2, 50);
    csConf.setCapacityByLabel(B2, "x", 0);
    
    final String C1 = C + ".c1";
    csConf.setCapacity(C1, 50);
    csConf.setCapacityByLabel(C1, "x", 100);
    csConf.setDefaultNodeLabelExpression(C1, "x");
    
    final String C2 = C + ".c2";
    csConf.setCapacity(C2, 50);
    csConf.setCapacityByLabel(C2, "x", 0);
    
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>

    // app1 -> a1
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    
    // app2 -> a2
    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
    
    // app3 -> b1
    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
    
    // app4 -> b2
    RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
    
    // app5 -> c1
    RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
    MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
    
    // app6 -> b2
    RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
    MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
    
    // Each application request 5 * 1GB container
    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
    
    // NM1 do 15 heartbeats
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    for (int i = 0; i < 15; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
    
    // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
    // containers)
    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);

    // Check pending resource each application
    // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
    checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
    checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
    checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
    checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
    checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);

    rm1.close();
  }
  
  @Test
  public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
    /**
     * Test case: have a following queue structure:
     * 
     * <pre>
     *            root
     *         /      \
     *        a        b
     *        (x)
     * </pre>
     * 
     * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
     * no-label.
     * 
     * When user-limit-factor=5, submit one application in queue b and request
     * for infinite containers should be able to use up all cluster resources.
     */
    
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);
    
    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 50);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 100);
    
    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 50);
    csConf.setAccessibleNodeLabels(B, new HashSet<String>());
    csConf.setUserLimitFactor(B, 5);
    
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>

    // app1 -> b
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
    
    // Each application request 50 * 1GB container
    am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
    
    // NM1 do 50 heartbeats
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
    
    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
    
    // How much cycles we waited to be allocated when available resource only on
    // partitioned node
    int cycleWaited = 0;
    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
      if (schedulerNode1.getNumContainers() == 0) {
        cycleWaited++;
      }
    }
    // We will will 10 cycles before get allocated on partitioned node
    // NM2 can allocate 10 containers totally, exclude already allocated AM
    // container, we will wait 9 to fulfill non-partitioned node, and need wait
    // one more cycle before allocating to non-partitioned node 
    Assert.assertEquals(10, cycleWaited);
    
    // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);

    rm1.close();
  }
  
  @Test
  public void testAMContainerAllocationWillAlwaysBeExclusive()
      throws Exception {
    /**
     * Test case: Submit one application without partition, trying to allocate a
     * node has partition=x, it should fail to allocate since AM container will
     * always respect exclusivity for partitions
     */
    
    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    String nodeIdStr = "h1:1234";
    MockNM nm1 = rm1.registerNode(nodeIdStr, 8 * GB); // label = x

    // launch an app to queue b1 (label = y), AM container should be launched in nm3
    RMApp app = rm1.submitApp(1 * GB, "app", "user", null, "b1");
   
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    
    // Heartbeat for many times, app1 should get nothing
    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }

    Assert.assertTrue(
        "Scheduler diagnostics should have reason for not assigning the node",
        app.getDiagnostics().toString().contains(
            CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE));

    Assert.assertTrue(
        "Scheduler diagnostics should have last processed node information",
        app.getDiagnostics().toString().contains(
            CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG
                + nodeIdStr + " ( Partition : [x]"));
    Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
        .getNumContainers());
    
    rm1.close();
  }

  @Test(timeout=60000)
  public void
      testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
          throws Exception {
    /**
     * Test case: have a following queue structure:
     * 
     * <pre>
     *            root
     *         /      \
     *        a        b
     *        (x)     (x)
     * </pre>
     * 
     * a/b can access x, both of them has max-capacity-on-x = 50
     * 
     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
     * resource.
     */

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
        "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 50);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 50);
    csConf.setMaximumCapacityByLabel(A, "x", 50);
    csConf.setUserLimit(A, 200);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 50);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 50);
    csConf.setMaximumCapacityByLabel(B, "x", 50);
    csConf.setUserLimit(B, 200);

    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>

    // app1 -> a
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);

    // app1 asks for 10 partition= containers
    am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());

    // NM1 do 50 heartbeats
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());

    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
    
    // app1 gets all resource in partition=x
    Assert.assertEquals(10, schedulerNode1.getNumContainers());

    // check non-exclusive containers of LeafQueue is correctly updated
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
    Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
        "y"));
    Assert.assertEquals(10,
        leafQueue.getIgnoreExclusivityRMContainers().get("x").size());

    // completes all containers of app1, ignoreExclusivityRMContainers should be
    // updated as well.
    cs.handle(new AppAttemptRemovedSchedulerEvent(
        am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false));
    Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
        "x"));

    rm1.close();
  }
  
  private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs,
      String nodePartition, float usedCapacity, float absoluteUsedCapacity) {
    float epsilon = 1e-6f;
    CSQueue queue = cs.getQueue(queueName);
    Assert.assertNotNull("Failed to get queue=" + queueName, queue);

    Assert.assertEquals(usedCapacity, queue.getQueueCapacities()
        .getUsedCapacity(nodePartition), epsilon);
    Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities()
        .getAbsoluteUsedCapacity(nodePartition), epsilon);
  }
  
  private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId);
    for (int i = 0; i < nHeartbeat; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
  }

  private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
      throws InterruptedException {
    int totalWaitTick = 100; // wait 10 sec at most.
    while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes()
        && totalWaitTick > 0) {
      Thread.sleep(100);
      totalWaitTick--;
    }
  }

  private void waitSchedulerNodeHasUpdatedLabels(CapacityScheduler cs,
      MockNM nm, String partition) throws InterruptedException {
    FiCaSchedulerNode node = cs.getNode(nm.getNodeId());
    int totalWaitTick = 20; // wait 2 sec at most.
    while (!node.getLabels().contains(partition)
        && totalWaitTick > 0) {
      Thread.sleep(100);
      totalWaitTick--;
    }
  }

  @Test
  public void testQueueUsedCapacitiesUpdate()
          throws Exception {
    /**
     * Test case: have a following queue structure:
     * 
     * <pre>
     *            root
     *         /      \
     *        a        b
     *       / \      (x)
     *      a1  a2
     *     (x)  (x)
     * </pre>
     * 
     * Both a/b can access x, we need to verify when
     * <pre>
     * 1) container allocated/released in both partitioned/non-partitioned node, 
     * 2) clusterResource updates
     * 3) queue guaranteed resource changed
     * </pre>
     * 
     * used capacity / absolute used capacity of queues are correctly updated.
     */

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
        "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    /**
     * Initially, we set A/B's resource 50:50
     */
    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 50);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 50);
    
    csConf.setQueues(A, new String[] { "a1", "a2" });
    
    final String A1 = A + ".a1";
    csConf.setCapacity(A1, 50);
    csConf.setAccessibleNodeLabels(A1, toSet("x"));
    csConf.setCapacityByLabel(A1, "x", 50);
    
    final String A2 = A + ".a2";
    csConf.setCapacity(A2, 50);
    csConf.setAccessibleNodeLabels(A2, toSet("x"));
    csConf.setCapacityByLabel(A2, "x", 50);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 50);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 50);

    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    /*
     * Before we adding any node to the cluster, used-capacity/abs-used-capacity
     * should be 0
     */
    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a", cs, "", 0f, 0f);
    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "", 0f, 0f);
    
    MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
    
    /*
     * After we adding nodes to the cluster, and before starting to use them,
     * used-capacity/abs-used-capacity should be 0
     */
    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a", cs, "", 0f, 0f);
    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "", 0f, 0f);

    // app1 -> a1
    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);

    // app1 asks for 1 partition= containers
    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
    
    doNMHeartbeat(rm, nm2.getNodeId(), 10);
    
    // Now check usage, app1 uses:
    //   a1: used(no-label) = 80%
    //       abs-used(no-label) = 20%
    //   a: used(no-label) = 40%
    //       abs-used(no-label) = 20%
    //   root: used(no-label) = 20%
    //       abs-used(no-label) = 20%
    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
    
    // app1 asks for 2 partition=x containers
    am1.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 10);
    
    // Now check usage, app1 uses:
    //   a1: used(x) = 80%
    //       abs-used(x) = 20%
    //   a: used(x) = 40%
    //       abs-used(x) = 20%
    //   root: used(x) = 20%
    //       abs-used(x) = 20%
    checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f);
    checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
    checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f);
    checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
    
    // submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED
    // container
    // app2 -> a2
    RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);

    // app1 asks for 1 partition= containers
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 10);
    
    // Now check usage, app1 uses:
    //   a2: used(x) = 40%
    //       abs-used(x) = 10%
    //   a: used(x) = 20%
    //       abs-used(x) = 10%
    //   root: used(x) = 10%
    //       abs-used(x) = 10%
    checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f);
    checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f);
    checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
    checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f);
    checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f);
    checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f);
    
    // Add nm3/nm4, double resource for both partitioned/non-partitioned
    // resource, used capacity should be 1/2 of before
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x")));
    MockNM nm3 = rm.registerNode("h3:1234", 10 * GB); // label = x
    MockNM nm4 = rm.registerNode("h4:1234", 10 * GB); // label = <empty>

    waitSchedulerNodeJoined(rm, 4);
    waitSchedulerNodeHasUpdatedLabels(cs, nm3, "x");
    waitSchedulerNodeHasUpdatedLabels(cs, nm4, "");

    checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f);
    checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f);
    checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f);
    checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f);
    checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f);
    checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
    checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
    
    // Reinitialize queue, makes A's capacity double, and B's capacity to be 0
    csConf.setCapacity(A, 100); // was 50
    csConf.setCapacityByLabel(A, "x", 100); // was 50
    csConf.setCapacity(B, 0); // was 50
    csConf.setCapacityByLabel(B, "x", 0); // was 50
    cs.reinitialize(csConf, rm.getRMContext());
    
    checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f);
    checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f);
    checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f);
    checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f);
    checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
    checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
    checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
    
    // Release all task containers from a1, check usage
    am1.allocate(null, Arrays.asList(
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2),
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3),
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 4)));
    checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f);
    checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f);
    checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f);
    checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f);
    checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
    checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
    checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f);
    checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f);

    rm.close();
  }

  @Test
  public void testOrderOfAllocationOnPartitions()
          throws Exception {
    /**
     * Test case: have a following queue structure:
     * 
     * <pre>
     *                root
     *          ________________
     *         /     |     \    \
     *        a (x)  b (x)  c    d
     * </pre>
     * 
     * Both a/b can access x, we need to verify when
     * <pre>
     * When doing allocation on partitioned nodes,
     *    - Queue has accessibility to the node will go first
     *    - When accessibility is same
     *      - Queue has less used_capacity on given partition will go first
     *      - When used_capacity is same
     *        - Queue has more abs_capacity will go first
     * </pre>
     * 
     * used capacity / absolute used capacity of queues are correctly updated.
     */

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
        "b", "c", "d" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 25);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 30);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 25);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 70);
    
    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
    csConf.setAccessibleNodeLabels(C, Collections.<String> emptySet());
    csConf.setCapacity(C, 25);
    
    final String D = CapacitySchedulerConfiguration.ROOT + ".d";
    csConf.setAccessibleNodeLabels(D, Collections.<String> emptySet());
    csConf.setCapacity(D, 25);

    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
    
    // app1 -> a
    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
    
    // app2 -> b
    RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
    
    // app3 -> c
    RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2);
    
    // app4 -> d
    RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d");
    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2);

    // Test case 1
    // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b
    // will go first since b has more capacity(x)
    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    
    // Test case 2
    // Do another allocation, a will go first since it has 0 use_capacity(x) and
    // b has 1/7 used_capacity(x)
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    
    // Test case 3
    // Just like above, when doing non-exclusive allocation, b will go first as well.
    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
    doNMHeartbeat(rm, nm1.getNodeId(), 2);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    
    // Test case 4
    // After b allocated, we should be able to allocate non-exlusive container in a
    doNMHeartbeat(rm, nm1.getNodeId(), 2);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    
    // Test case 5
    // b/c/d asks non-exclusive container together, b will go first irrelated to
    // used_capacity(x)
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
    am3.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
    am4.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
    doNMHeartbeat(rm, nm1.getNodeId(), 2);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
    
    // Test case 6
    // After b allocated, c will go first by lexicographic order
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
    
    // Test case 7
    // After c allocated, d will go first because it has less used_capacity(x)
    // than c
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
    
    // Test case 8
    // After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am4.getApplicationAttemptId()));

  }

  @Test
  public void testOrderOfAllocationOnPartitionsWhenAccessibilityIsAll()
      throws Exception {
    /**
     * Test case: have a following queue structure:
     *
     * <pre>
     *             root
     *          __________
     *         /          \
     *        a (*)      b (x)
     * </pre>
     *
     * Both queues a/b can access x, we need to verify whether * accessibility
     * is considered in ordering of queues
     */

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
        new String[] { "a", "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 25);
    csConf.setAccessibleNodeLabels(A, toSet("*"));
    csConf.setCapacityByLabel(A, "x", 60);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 75);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 40);

    // set node -> label
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x

    // app1 -> a
    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a", "x");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

    // app2 -> b
    RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b", "x");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);

    // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, a
    // will go first since a has more capacity(x)
    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 1);
    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
  }

  @Test
  public void testParentQueueMaxCapsAreRespected() throws Exception {
    /*
     * Queue tree:
     *          Root
     *        /     \
     *       A       B
     *      / \
     *     A1 A2
     *
     * A has 50% capacity and 50% max capacity (of label=x)
     * A1/A2 has 50% capacity and 100% max capacity (of label=x)
     * Cluster has one node (label=x) with resource = 24G.
     * So we can at most use 12G resources under queueA.
     */
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration(this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
        "b"});
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(A, 10);
    csConf.setAccessibleNodeLabels(A, toSet("x"));
    csConf.setCapacityByLabel(A, "x", 50);
    csConf.setMaximumCapacityByLabel(A, "x", 50);

    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(B, 90);
    csConf.setAccessibleNodeLabels(B, toSet("x"));
    csConf.setCapacityByLabel(B, "x", 50);
    csConf.setMaximumCapacityByLabel(B, "x", 50);

    // Define 2nd-level queues
    csConf.setQueues(A, new String[] { "a1",
        "a2"});

    final String A1 = A + ".a1";
    csConf.setCapacity(A1, 50);
    csConf.setAccessibleNodeLabels(A1, toSet("x"));
    csConf.setCapacityByLabel(A1, "x", 50);
    csConf.setMaximumCapacityByLabel(A1, "x", 100);
    csConf.setUserLimitFactor(A1, 100.0f);

    final String A2 = A + ".a2";
    csConf.setCapacity(A2, 50);
    csConf.setAccessibleNodeLabels(A2, toSet("x"));
    csConf.setCapacityByLabel(A2, "x", 50);
    csConf.setMaximumCapacityByLabel(A2, "x", 100);
    csConf.setUserLimitFactor(A2, 100.0f);

    // set node -> label
    mgr.addToCluserNodeLabels(ImmutableSet.of(
        NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    MockNM nm1 =
        new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
    nm1.registerNode();

    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
    am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
    doNMHeartbeat(rm, nm1.getNodeId(), 10);
    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));

    // Try to launch app2 in a2, asked 2GB, should success
    RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);

    // am2 asks more resources, cannot success because current used = 9G (app1)
    // + 2G (app2) = 11G, and queue's max capacity = 12G
    am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");

    doNMHeartbeat(rm, nm1.getNodeId(), 10);
    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
  }

  @Test
  public void testQueueMetricsWithLabels() throws Exception {
    /**
     * Test case: have a following queue structure:
     *
     * <pre>
     *            root
     *         /      \
     *        a        b
     *        (x)     (x)
     * </pre>
     *
     * a/b can access x, both of them has max-capacity-on-x = 50
     *
     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
     * resource.
     */

    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
        this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
        new String[] { "a", "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(queueA, 25);
    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
    csConf.setCapacityByLabel(queueA, "x", 50);
    csConf.setMaximumCapacityByLabel(queueA, "x", 50);
    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(queueB, 75);
    csConf.setAccessibleNodeLabels(queueB, toSet("x"));
    csConf.setCapacityByLabel(queueB, "x", 50);
    csConf.setMaximumCapacityByLabel(queueB, "x", 50);

    // set node -> label
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("x", false)));
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("y", false)));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
    assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
    LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b");
    assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());

    // app1 -> a
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // app1 asks for 5 partition=x containers
    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
    // NM1 do 50 heartbeats
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());

    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }

    // app1 gets all resource in partition=x
    Assert.assertEquals(5, schedulerNode1.getNumContainers());

    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
        .getNodeReport(nm1.getNodeId());
    Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(5 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
        .getNodeReport(nm2.getNodeId());
    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
    Assert.assertEquals(10 * GB,
        reportNm2.getAvailableResource().getMemorySize());

    assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
    assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());

    // The total memory tracked by QueueMetrics is 0GB for the default partition
    CSQueue rootQueue = cs.getRootQueue();
    assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
        rootQueue.getMetrics().getAllocatedMB());

    // Kill all apps in queue a
    cs.killAllAppsInQueue("a");
    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());

    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
    rm1.close();
  }

  @Test
  public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception {
    /**
     * Test case: have a following queue structure:
     *
     * <pre>
     *            root
     *         /      \
     *        a        b
     *        (x)     (x)
     * </pre>
     *
     * a/b can access x, both of them has max-capacity-on-x = 50
     *
     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
     * resource.
     */

    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
        this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
        new String[] { "a", "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(queueA, 25);
    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
    csConf.setCapacityByLabel(queueA, "x", 50);
    csConf.setMaximumCapacityByLabel(queueA, "x", 50);
    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(queueB, 75);
    csConf.setAccessibleNodeLabels(queueB, toSet("x"));
    csConf.setCapacityByLabel(queueB, "x", 50);
    csConf.setMaximumCapacityByLabel(queueB, "x", 50);

    // set node -> label
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
    // app1 -> a
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);

    // app1 asks for 3 partition= containers
    am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());

    // NM1 do 50 heartbeats
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }

    // app1 gets all resource in partition=x (non-exclusive)
    Assert.assertEquals(3, schedulerNode1.getNumContainers());

    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
        .getNodeReport(nm1.getNodeId());
    Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(7 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
        .getNodeReport(nm2.getNodeId());
    Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
    Assert.assertEquals(9 * GB,
        reportNm2.getAvailableResource().getMemorySize());

    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
    // 3GB is used from label x quota. 1.5 GB is remaining from default label.
    // 2GB is remaining from label x.
    assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
    assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());

    // app1 asks for 1 default partition container
    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());

    // NM2 do couple of heartbeats
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());

    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));

    // app1 gets all resource in default partition
    Assert.assertEquals(2, schedulerNode2.getNumContainers());

    // 3GB is used from label x quota. 2GB used from default label.
    // So 0.5 GB is remaining from default label.
    assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB());
    assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());

    // The total memory tracked by QueueMetrics is 10GB
    // for the default partition
    CSQueue rootQueue = cs.getRootQueue();
    assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
        rootQueue.getMetrics().getAllocatedMB());

    rm1.close();
  }

  @Test
  public void testQueueMetricsWithMixedLabels() throws Exception {
    // There is only one queue which can access both default label and label x.
    // There are two nodes of 10GB label x and 12GB no label.
    // The test is to make sure that the queue metrics is only tracking the
    // allocations and availability from default partition

    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
        this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
        new String[] {"a"});
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(queueA, 100);
    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
    csConf.setCapacityByLabel(queueA, "x", 100);
    csConf.setMaximumCapacityByLabel(queueA, "x", 100);

    // set node -> label
    // label x exclusivity is set to true
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("x", true)));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());

    // app1 -> a
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // app1 asks for 5 partition=x containers
    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
    // NM1 do 50 heartbeats
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());

    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }

    // app1 gets all resource in partition=x
    Assert.assertEquals(6, schedulerNode1.getNumContainers());

    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
        .getNodeReport(nm1.getNodeId());
    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(4 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
        .getNodeReport(nm2.getNodeId());
    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
    Assert.assertEquals(12 * GB,
        reportNm2.getAvailableResource().getMemorySize());

    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());

    // app2 -> a
    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", "");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

    // app2 asks for 5 partition= containers
    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
    // NM2 do 50 heartbeats
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());

    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());

    for (int i = 0; i < 50; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
    }

    // app1 gets all resource in partition=x
    Assert.assertEquals(6, schedulerNode2.getNumContainers());

    reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(4 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
    Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
    Assert.assertEquals(6 * GB,
        reportNm2.getAvailableResource().getMemorySize());

    assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
    assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());

    // The total memory tracked by QueueMetrics is 12GB
    // for the default partition
    CSQueue rootQueue = cs.getRootQueue();
    assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
        rootQueue.getMetrics().getAllocatedMB());

    // Kill all apps in queue a
    cs.killAllAppsInQueue("a");
    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());

    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
    rm1.close();
  }

  @Test
  public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
    /**
     * Test case: have a following queue structure:
     *
     * <pre>
     *
     *          root
     *        /      \
     *       a        b
     *      (x)      (x)
     *      / \
     *     a1 a2
     *    (x) (x)
     * </pre>
     *
     * a/b can access x, both of them has max-capacity-on-x = 50
     *
     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
     * resource.
     */

    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
        this.conf);

    // Define top-level queues
    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
        new String[] { "a", "b" });
    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
    csConf.setCapacity(queueA, 50);
    csConf.setMaximumCapacity(queueA, 100);
    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
    csConf.setCapacityByLabel(queueA, "x", 50);
    csConf.setMaximumCapacityByLabel(queueA, "x", 100);
    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
    csConf.setCapacity(queueB, 50);
    csConf.setMaximumCapacity(queueB, 100);
    csConf.setAccessibleNodeLabels(queueB, toSet("x"));
    csConf.setCapacityByLabel(queueB, "x", 50);
    csConf.setMaximumCapacityByLabel(queueB, "x", 100);

    // Define 2nd-level queues
    csConf.setQueues(queueA, new String[] { "a1",
        "a2"});

    final String A1 = queueA + ".a1";
    csConf.setCapacity(A1, 20);
    csConf.setMaximumCapacity(A1, 60);
    csConf.setAccessibleNodeLabels(A1, toSet("x"));
    csConf.setCapacityByLabel(A1, "x", 60);
    csConf.setMaximumCapacityByLabel(A1, "x", 30);

    final String A2 = queueA + ".a2";
    csConf.setCapacity(A2, 80);
    csConf.setMaximumCapacity(A2, 40);
    csConf.setAccessibleNodeLabels(A2, toSet("x"));
    csConf.setCapacityByLabel(A2, "x", 40);
    csConf.setMaximumCapacityByLabel(A2, "x", 20);

    // set node -> label
    mgr.addToCluserNodeLabels(
        ImmutableSet.of(NodeLabel.newInstance("x", false)));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
    // app1 -> a1
    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // app1 asks for 6 partition=x containers
    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");

    // NM1 do 50 heartbeats
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    doNMHeartbeat(rm1, nm1.getNodeId(), 50);
    checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
        cs.getApplicationAttempt(am1.getApplicationAttemptId()));

    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
        .getNodeReport(nm1.getNodeId());
    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(14 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    // Try to launch app2 in a2, asked 2GB, should success
    // app2 -> a2
    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // app2 asks for 4 partition=x containers
    am2.allocate("*", 1 * GB, 4, new ArrayList<ContainerId>(), "x");
    // NM1 do 50 heartbeats

    doNMHeartbeat(rm1, nm1.getNodeId(), 50);
    checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
        cs.getApplicationAttempt(am2.getApplicationAttemptId()));

    reportNm1 = rm1.getResourceScheduler()
        .getNodeReport(nm1.getNodeId());
    Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(10 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    // Kill all apps in queue a2
    cs.killAllAppsInQueue("a2");
    rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
    rm1.waitForAppRemovedFromScheduler(app2.getApplicationId());

    // Try to launch app3 in a2, asked 6GB, should fail
    // app3 -> a2
    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);

    am3.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");
    // NM1 do 50 heartbeats
    doNMHeartbeat(rm1, nm1.getNodeId(), 50);
    // app3 cannot preempt more resources restricted by disable elasticity
    checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
        cs.getApplicationAttempt(am3.getApplicationAttemptId()));

    Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(10 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    // Kill all apps in queue a1
    cs.killAllAppsInQueue("a1");
    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());

    // app4 -> a1, try to allocate more than 6GB resource, should fail
    RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1);

    // app3 asks for 7 partition=x containers
    am4.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>(), "x");
    // NM1 do 50 heartbeats
    doNMHeartbeat(rm1, nm1.getNodeId(), 50);

    // app4 should only gets 6GB resource in partition=x
    // since elasticity is disabled
    checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
        cs.getApplicationAttempt(am4.getApplicationAttemptId()));

    Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
    Assert.assertEquals(10 * GB,
        reportNm1.getAvailableResource().getMemorySize());

    rm1.close();
  }
}
