blob: d1ccc9a6fec19367acc8a62f5ce2507ca20b9f2a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.Before;
import org.junit.Test;
public class TestMiniYarnClusterNodeUtilization {
// Mini YARN cluster setup
private static final int NUM_RM = 1;
private static final int NUM_NM = 1;
// Values for the first round
private static final int CONTAINER_PMEM_1 = 1024;
private static final int CONTAINER_VMEM_1 = 2048;
private static final float CONTAINER_CPU_1 = 11.0f;
private static final int NODE_PMEM_1 = 10240;
private static final int NODE_VMEM_1 = 20480;
private static final float NODE_CPU_1 = 51.0f;
// Values for the second round
private static final int CONTAINER_PMEM_2 = 2048;
private static final int CONTAINER_VMEM_2 = 4096;
private static final float CONTAINER_CPU_2 = 22.0f;
private static final int NODE_PMEM_2 = 20480;
private static final int NODE_VMEM_2 = 40960;
private static final float NODE_CPU_2 = 61.0f;
private MiniYARNCluster cluster;
private CustomNodeManager nm;
private Configuration conf;
private NodeStatus nodeStatus;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
String name = TestMiniYarnClusterNodeUtilization.class.getName();
cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
nm = (CustomNodeManager)cluster.getNodeManager(0);
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1,
NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1);
nm.setNodeStatus(nodeStatus);
}
/**
* Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify
* both the RMNode and SchedulerNode have been updated with the new
* utilization.
*/
@Test(timeout=60000)
public void testUpdateNodeUtilization()
throws InterruptedException, IOException, YarnException {
assertTrue("NMs fail to connect to the RM",
cluster.waitForNodeManagersToConnect(10000));
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
// Alter utilization
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
nm.setNodeStatus(nodeStatus);
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
}
/**
* Trigger the NM to send a heartbeat using the simulated NodeStatus fixture.
* Verify both the RMNode and SchedulerNode have been updated with the new
* utilization.
*/
@Test(timeout=60000)
public void testMockNodeStatusHeartbeat()
throws InterruptedException, YarnException {
assertTrue("NMs fail to connect to the RM",
cluster.waitForNodeManagersToConnect(10000));
NodeStatusUpdater updater = nm.getNodeStatusUpdater();
updater.sendOutofBandHeartBeat();
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
// Alter utilization
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
nm.setNodeStatus(nodeStatus);
updater.sendOutofBandHeartBeat();
verifySimulatedUtilization();
}
/**
* Create a NodeStatus test vector.
* @param nodeId Node identifier.
* @param responseId Response identifier.
* @param containerPMem Virtual memory of the container.
* @param containerVMem Physical memory of the container.
* @param containerCPU CPU percentage of the container.
* @param nodePMem Physical memory of the node.
* @param nodeVMem Virtual memory of the node.
* @param nodeCPU CPU percentage of the node.
*/
private NodeStatus createNodeStatus(
NodeId nodeId,
int responseId,
int containerPMem,
int containerVMem,
float containerCPU,
int nodePMem,
int nodeVMem,
float nodeCPU) {
// Fake node status with fake utilization
ResourceUtilization containersUtilization =
ResourceUtilization.newInstance(containerPMem, containerVMem,
containerCPU);
ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU);
NodeStatus status = NodeStatus.newInstance(
nodeId,
responseId,
new ArrayList<ContainerStatus>(),
null,
NodeHealthStatus.newInstance(true, null, 0),
containersUtilization,
nodeUtilization,
null);
return status;
}
/**
* Verify both the RMNode and SchedulerNode have been updated with the test
* fixture utilization data.
*/
private void verifySimulatedUtilization() throws InterruptedException {
ResourceManager rm = cluster.getResourceManager(0);
RMContext rmContext = rm.getRMContext();
ResourceUtilization containersUtilization =
nodeStatus.getContainersUtilization();
ResourceUtilization nodeUtilization =
nodeStatus.getNodeUtilization();
// Give the heartbeat time to propagate to the RM (max 10 seconds)
// We check if the nodeUtilization is up to date
for (int i=0; i<100; i++) {
for (RMNode ni : rmContext.getRMNodes().values()) {
if (ni.getNodeUtilization() != null) {
if (ni.getNodeUtilization().equals(nodeUtilization)) {
break;
}
}
}
Thread.sleep(100);
}
// Verify the data is readable from the RM and scheduler nodes
for (RMNode ni : rmContext.getRMNodes().values()) {
ResourceUtilization cu = ni.getAggregatedContainersUtilization();
assertEquals("Containers Utillization not propagated to RMNode",
containersUtilization, cu);
ResourceUtilization nu = ni.getNodeUtilization();
assertEquals("Node Utillization not propagated to RMNode",
nodeUtilization, nu);
SchedulerNode scheduler =
rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
cu = scheduler.getAggregatedContainersUtilization();
assertEquals("Containers Utillization not propagated to SchedulerNode",
containersUtilization, cu);
nu = scheduler.getNodeUtilization();
assertEquals("Node Utillization not propagated to SchedulerNode",
nodeUtilization, nu);
}
}
}