blob: ec8010a06ccd9079b226d0720f89ac5e04a6260d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app2.rm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestRMContainerAllocator {
static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
static final Priority MAP_PRIORITY = RMContainerAllocatorForTest.getMapPriority();
static final Priority REDUCE_PRIORITY = RMContainerAllocatorForTest.getReducePriority();
static final Priority FAST_FAIL_MAP_PRIORITY = RMContainerAllocatorForTest.getFailedMapPriority();
private static final int port = 3333;
/**
* Verifies simple allocation. Matches pending requets to assigned containers.
*/
@Test
public void testSimple() {
LOG.info("Running testSimple");
YarnConfiguration conf = new YarnConfiguration();
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
new String[] { "h1" });
AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
new String[] { "h2" });
AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
new String[] { "h3" });
scheduler.handleEvent(event1);
scheduler.handleEvent(event2);
scheduler.handleEvent(event3);
assertEquals(3, rmComm.addRequests.size());
Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(container1.getId());
containerIds.add(container2.getId());
containerIds.add(container3.getId());
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
scheduler.handleEvent(allocatedEvent);
checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2,
event3 }, eventHandler.launchRequests, eventHandler.assignEvents, true,
appContext);
}
/**
* Verifies allocation based on resource ask and allocation.
*/
@Test
public void testResource() {
LOG.info("Running testResource");
YarnConfiguration conf = new YarnConfiguration();
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
new String[] { "h1" });
AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 2048,
new String[] { "h2" });
// Register the asks with the AMScheduler
scheduler.handleEvent(event1);
scheduler.handleEvent(event2);
assertEquals(2, rmComm.addRequests.size());
Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
Container container2 = newContainer(appContext, 2, "h2", 2048, MAP_PRIORITY);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(container1.getId());
containerIds.add(container2.getId());
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
scheduler.handleEvent(allocatedEvent);
checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2 },
eventHandler.launchRequests, eventHandler.assignEvents, true,
appContext);
}
@Test
public void testMapReduceScheduling() {
LOG.info("Running testMapReduceScheduling");
YarnConfiguration conf = new YarnConfiguration();
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
// Map, previously failed.
AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 2048,
new String[] { "h1", "h2" }, true, false);
// Reduce, first attempt.
AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 3000,
new String[] { "h1" }, false, true);
// Map, first attempt.
AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 2048,
new String[] { "h3" }, false, false);
// Register the asks with the AMScheduler
scheduler.handleEvent(event1);
scheduler.handleEvent(event2);
scheduler.handleEvent(event3);
// The reduce should not have been asked for yet - event2
assertEquals(2, rmComm.addRequests.size());
Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
Container container2 = newContainer(appContext, 2, "h1", 3072,
REDUCE_PRIORITY);
Container container3 = newContainer(appContext, 3, "h3", 2048, MAP_PRIORITY);
Container container4 = newContainer(appContext, 4, "h1", 2048,
FAST_FAIL_MAP_PRIORITY);
// Container1 - release low mem.
// Container2 - release no pending reduce request.
// Container3 - assign to t3
// Container4 - assign to t1
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(container1.getId());
containerIds.add(container2.getId());
containerIds.add(container3.getId());
containerIds.add(container4.getId());
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
scheduler.handleEvent(allocatedEvent);
// Two stop container events for Container1 and Container2 ?
assertEquals(2, eventHandler.stopEvents.size());
// Since maps have been assigned containers. Verify that a request is sent
// out for the pending reduce task.
assertEquals(3, rmComm.addRequests.size());
// Verify map assignments.
checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event3 },
eventHandler.launchRequests, eventHandler.assignEvents, true,
appContext);
eventHandler.reset();
Container container5 = newContainer(appContext, 5, "h1", 3072,
REDUCE_PRIORITY); // assign to t2
containerIds.clear();
containerIds.add(container5.getId());
allocatedEvent = new AMSchedulerEventContainersAllocated(containerIds,
false);
scheduler.handleEvent(allocatedEvent);
// Verify reduce assignment.
checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event2 },
eventHandler.launchRequests, eventHandler.assignEvents, true,
appContext);
}
@Test
public void testReduceScheduling() throws Exception {
int totalMaps = 10;
int succeededMaps = 1;
int scheduledMaps = 10;
int scheduledReduces = 0;
int assignedMaps = 2;
int assignedReduces = 0;
int mapResourceReqt = 1024;
int reduceResourceReqt = 2*1024;
int numPendingReduces = 4;
float maxReduceRampupLimit = 0.5f;
float reduceSlowStart = 0.2f;
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
doCallRealMethod().when(allocator).
scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(),
anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
// Test slow-start
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
// verify slow-start still in effect when no more maps need to
// be scheduled but some have yet to complete
allocator.scheduleReduces(
totalMaps, succeededMaps,
0, scheduledReduces,
totalMaps - succeededMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
verify(allocator, never()).scheduleAllReduces();
succeededMaps = 3;
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(1)).setIsReduceStarted(true);
// Test reduce ramp-up
doReturn(100 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampUpReduces(anyInt());
verify(allocator, never()).rampDownReduces(anyInt());
// Test reduce ramp-down
scheduledReduces = 3;
doReturn(10 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
@Test
public void testBlackListedNodes() throws Exception {
LOG.info("Running testBlackListedNodes");
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
// 3 requests on 3 hosts.
AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
new String[] { "h1", "h4" });
AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
new String[] { "h2" });
AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
new String[] { "h3" });
scheduler.handleEvent(event1);
scheduler.handleEvent(event2);
scheduler.handleEvent(event3);
assertEquals(3, rmComm.addRequests.size());
// 3 containers on 3 hosts.
Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
// Simulate two blacklisted nodes.
appContext.getAllNodes().handle(
new AMNodeEvent(container1.getNodeId(),
AMNodeEventType.N_NODE_WAS_BLACKLISTED));
appContext.getAllNodes().handle(
new AMNodeEvent(container2.getNodeId(),
AMNodeEventType.N_NODE_WAS_BLACKLISTED));
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(container1.getId());
containerIds.add(container2.getId());
containerIds.add(container3.getId());
rmComm.reset();
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
scheduler.handleEvent(allocatedEvent);
// Only contianer 3 should have been assigned.
checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event3 },
eventHandler.launchRequests, eventHandler.assignEvents, true,
appContext);
// Verify container stop events for the remaining two containers.
assertEquals(2, eventHandler.stopEvents.size());
Set<ContainerId> tmpSet = new HashSet<ContainerId>();
tmpSet.add(container1.getId());
tmpSet.add(container2.getId());
for (AMContainerEvent ame : eventHandler.stopEvents) {
tmpSet.remove(ame.getContainerId());
}
assertEquals(0, tmpSet.size());
// Verify new request events were sent out to replace these containers.
assertEquals(2, rmComm.addRequests.size());
// One of the requests should refer to host4.
boolean hostSeen = false;
for (ContainerRequest c : rmComm.addRequests) {
if (c.hosts.length != 0) {
if (!hostSeen) {
assertEquals(1, c.hosts.length);
assertEquals("h4", c.hosts[0]);
hostSeen = true;
} else {
fail("Only one request should have a host");
}
}
}
}
@Test
public void testIgnoreBlacklisting() throws Exception {
LOG.info("Running testIgnoreBlacklisting");
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
int attemptId = 0;
int currentHostNum = 0;
NodeId[] nodeIds = new NodeId[10];
// Add a node.
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
// known=1, blacklisted=0. IgnoreBlacklisting=false, Assign 1
assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
// Blacklist node1.
blacklistNode(nodeIds[0], appContext);
// known=1, blacklisted=1. IgnoreBlacklisting=true, Assign 1
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
currentHostNum++;
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
currentHostNum++;
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
// on non blacklisted)
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
// on blacklisted)
assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
currentHostNum++;
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
// (Request on non blacklisted)
assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
// (Request on blacklisted)
assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
eventHandler, appContext);
assertEquals(0, eventHandler.assignEvents.size());
// Blacklist node2.
blacklistNode(nodeIds[1], appContext);
// Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
// on blacklisted)
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
// Blacklist node3. (While ignore is enabled)
blacklistNode(nodeIds[2], appContext);
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
currentHostNum++;
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
// Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
// on non-blacklisted)
assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
scheduler, eventHandler, appContext);
assertEquals(1, eventHandler.assignEvents.size());
// Add 5 more nodes.
for (int i = 0; i < 5; i++) {
currentHostNum++;
nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
}
// Known=9, blacklisted=3, ignore should be false - assign 1 on host3.
assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
assignContainerOnHost(jobId, nodeIds[2], ++attemptId, scheduler,
eventHandler, appContext);
assertEquals(0, eventHandler.assignEvents.size());
}
@Test
public void testCompletedTasksRecalculateSchedule() throws Exception {
LOG.info("Running testCompletedTasksRecalculateSchedule");
YarnConfiguration conf = new YarnConfiguration();
TrackingEventHandler eventHandler = new TrackingEventHandler();
AppContext appContext = setupDefaultTestContext(eventHandler, conf);
TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
appContext);
rmComm.init(conf);
rmComm.start();
RecalculateContainerAllocator scheduler = new RecalculateContainerAllocator(
rmComm, appContext);
scheduler.init(conf);
scheduler.start();
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
.getApplicationID()));
Job job = appContext.getJob(jobId);
doReturn(10).when(job).getTotalMaps();
doReturn(10).when(job).getTotalReduces();
doReturn(2).when(job).getCompletedMaps();
doReturn(0).when(job).getCompletedReduces();
List<ContainerId> containerIds = new LinkedList<ContainerId>();
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
// Ignore the first allocate.
scheduler.handleEvent(allocatedEvent);
// Since nothing changed, recalculate reduce = false.
scheduler.recalculatedReduceSchedule = false;
scheduler.handleEvent(allocatedEvent);
assertFalse("Unexpected recalculate of reduce schedule",
scheduler.recalculatedReduceSchedule);
// Change completedMaps. Recalcualte reduce should be true.
doReturn(1).when(job).getCompletedMaps();
scheduler.handleEvent(allocatedEvent);
assertTrue("Expected recalculate of reduce schedule",
scheduler.recalculatedReduceSchedule);
// Since nothing changed, recalculate reduce = false.
scheduler.recalculatedReduceSchedule = false;
scheduler.handleEvent(allocatedEvent);
assertFalse("Unexpected recalculate of reduce schedule",
scheduler.recalculatedReduceSchedule);
}
// TODO XXX Unit test for AMNode to simulate node health status change.
private void blacklistNode(NodeId nodeId, AppContext appContext) {
appContext.getAllNodes().handle(
new AMNodeEvent(nodeId, AMNodeEventType.N_NODE_WAS_BLACKLISTED));
}
private NodeId addNode(int currentHostNum, AppContext appContext) {
NodeId nodeId = BuilderUtils.newNodeId("h" + currentHostNum, port);
appContext.getAllNodes().nodeSeen(nodeId);
appContext.getAllNodes().handle(
new AMNodeEventNodeCountUpdated(appContext.getAllNodes().size()));
return nodeId;
}
/**
* Generates the events for a TaskAttempt start request and an associated
* container assignment. Resets TrackingEventHandler statistics.
*/
void assignContainerOnHost(JobId jobId, NodeId nodeId, int attemptId,
RMContainerAllocatorForTest scheduler, TrackingEventHandler eventHandler,
AppContext appContext) {
eventHandler.reset();
AMSchedulerTALaunchRequestEvent launchRequest = createTALaunchReq(jobId,
attemptId, 1024, new String[] { nodeId.getHost() });
scheduler.handleEvent(launchRequest);
Container container = newContainer(appContext, attemptId, nodeId.getHost(),
1024, MAP_PRIORITY);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(container.getId());
AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
containerIds, false);
scheduler.handleEvent(allocatedEvent);
}
/**
* Verify all assignments and launch requests against the requests.
*/
private void checkAssignments(AMSchedulerTALaunchRequestEvent[] requests,
List<AMContainerLaunchRequestEvent> launchRequests,
List<AMContainerAssignTAEvent> assignEvents, boolean checkHostMatch,
AppContext appContext) {
assertNotNull("Containers not assigned", launchRequests);
assertNotNull("Containers not assigned", assignEvents);
assertEquals("LaunchRequest Count not correct", requests.length,
launchRequests.size());
assertEquals("Assigned Count not correct", requests.length,
assignEvents.size());
Set<ContainerId> containerIds = new HashSet<ContainerId>();
// Check for uniqueness of container id launch requests
for (AMContainerLaunchRequestEvent launchRequest : launchRequests) {
containerIds.add(launchRequest.getContainerId());
}
assertEquals("Multiple launch requests for same container id",
assignEvents.size(), containerIds.size());
// Check for uniqueness of container Id assignments.
containerIds.clear();
for (AMContainerAssignTAEvent assignEvent : assignEvents) {
containerIds.add(assignEvent.getContainerId());
}
assertEquals("Assigned container Ids not unique", assignEvents.size(),
containerIds.size());
AMContainerAssignTAEvent assignment = null;
// Check that all requests were assigned a container.
for (AMSchedulerTALaunchRequestEvent request : requests) {
for (AMContainerAssignTAEvent assignEvent : assignEvents) {
if (request.getAttemptID().equals(assignEvent.getTaskAttemptId())) {
assignment = assignEvent;
break;
}
}
checkAssignment(request, assignment, checkHostMatch, appContext);
assignment = null;
}
}
/**
* Verify assignment for a single request / allocation, optionally checking
* for the requested host.
*/
private void checkAssignment(AMSchedulerTALaunchRequestEvent request,
AMContainerAssignTAEvent assignEvent, boolean checkHostMatch,
AppContext appContext) {
Assert.assertNotNull(
"Nothing assigned to attempt " + request.getAttemptID(), assignEvent);
if (checkHostMatch) {
if (request.getHosts().length == 0) {
return;
} else {
Assert.assertTrue(
"Not assigned to requested host",
Arrays.asList(request.getHosts()).contains(
appContext.getAllContainers().get(assignEvent.getContainerId())
.getContainer().getNodeId().getHost()));
}
}
}
/**
* Create containers for allocation. Will also register the associated node
* with the AMNodeMap, and the container with the AMContainerMap.
*/
private Container newContainer(AppContext appContext,
int containerNum, String host, int memory, Priority priority) {
ContainerId containerId = BuilderUtils.newContainerId(appContext.getApplicationAttemptId(),
containerNum);
NodeId nodeId = BuilderUtils.newNodeId(host, port);
appContext.getAllNodes().nodeSeen(nodeId);
Resource resource = BuilderUtils.newResource(memory);
Container container = BuilderUtils.newContainer(containerId, nodeId, host
+ ":8000", resource, priority, null);
appContext.getAllContainers().addContainerIfNew(container);
return container;
}
private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
int taskAttemptId, int memory, String[] hosts) {
return createTALaunchReq(jobId, taskAttemptId, memory, hosts, false, false);
}
private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
int taskAttemptId, int memory, String[] hosts,
boolean earlierFailedAttempt, boolean reduce) {
Resource resource = BuilderUtils.newResource(memory);
TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
} else {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
}
String[] hostArray;
String[] rackArray;
if (earlierFailedAttempt) {
hostArray = new String[0];
rackArray = new String[0];
} else {
hostArray = hosts;
rackArray = new String[] { NetworkTopology.DEFAULT_RACK };
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
AMSchedulerTALaunchRequestEvent event = new AMSchedulerTALaunchRequestEvent(
attemptId, earlierFailedAttempt, resource, null, null, null, null,
hostArray, rackArray);
return event;
}
static class RMContainerAllocatorForTest extends RMContainerAllocator {
public RMContainerAllocatorForTest(ContainerRequestor requestor,
AppContext appContext) {
super(requestor, appContext);
}
@Override
public void start() {
this.job = appContext.getJob(jobId);
}
@Override
protected void handleEvent(AMSchedulerEvent event) {
super.handleEvent(event);
}
@Override
protected boolean shouldProfileTaskAttempt(JobConf conf,
org.apache.hadoop.mapred.Task remoteTask) {
return false;
}
static Priority getMapPriority() {
return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
}
static Priority getReducePriority() {
return BuilderUtils.newPriority(PRIORITY_REDUCE.getPriority());
}
static Priority getFailedMapPriority() {
return BuilderUtils.newPriority(PRIORITY_FAST_FAIL_MAP.getPriority());
}
}
static class RecalculateContainerAllocator extends
RMContainerAllocatorForTest {
boolean recalculatedReduceSchedule = false;
public RecalculateContainerAllocator(ContainerRequestor requestor,
AppContext appContext) {
super(requestor, appContext);
}
@Override
public void scheduleReduces(int totalMaps, int completedMaps,
int scheduledMaps, int scheduledReduces, int assignedMaps,
int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
recalculatedReduceSchedule = true;
}
@Override
protected boolean shouldProfileTaskAttempt(JobConf conf,
org.apache.hadoop.mapred.Task remoteTask) {
return false;
}
}
class TrackingAMContainerRequestor extends RMContainerRequestor {
List<ContainerRequest> addRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
List<ContainerRequest> decRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
private Resource minContainerMemory = Records.newRecord(Resource.class);
private Resource maxContainerMemory = Records.newRecord(Resource.class);
void reset() {
addRequests.clear();
decRequests.clear();
}
public TrackingAMContainerRequestor(AppContext context) {
this(context, 1024, 10240);
}
public TrackingAMContainerRequestor(AppContext context,
int minContainerMemory, int maxContainerMemory) {
super(null, context);
this.minContainerMemory.setMemory(minContainerMemory);
this.maxContainerMemory.setMemory(maxContainerMemory);
}
@Override
public void addContainerReq(ContainerRequest request) {
addRequests.add(request);
}
@Override
public void decContainerReq(ContainerRequest request) {
decRequests.add(request);
}
@Override
public Map<ApplicationAccessType, String> getApplicationAcls() {
return null;
}
@Override
public Resource getAvailableResources() {
return BuilderUtils.newResource(0);
}
@Override
protected Resource getMaxContainerCapability() {
return maxContainerMemory;
}
@Override
protected Resource getMinContainerCapability() {
return minContainerMemory;
}
@Override
public void register() {
}
@Override
public void unregister() {
}
@Override
public void startAllocatorThread() {
}
@Override
public AMRMProtocol createSchedulerProxy() {
return null;
}
}
@SuppressWarnings("rawtypes")
private class TrackingEventHandler implements EventHandler {
List<AMContainerLaunchRequestEvent> launchRequests = new LinkedList<AMContainerLaunchRequestEvent>();
List<AMContainerAssignTAEvent> assignEvents = new LinkedList<AMContainerAssignTAEvent>();
List<AMContainerEvent> stopEvents = new LinkedList<AMContainerEvent>();
List<RMCommunicatorContainerDeAllocateRequestEvent> releaseEvents = new LinkedList<RMCommunicatorContainerDeAllocateRequestEvent>();
@Override
public void handle(Event event) {
if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) {
launchRequests.add((AMContainerLaunchRequestEvent)event);
} else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
assignEvents.add((AMContainerAssignTAEvent)event);
} else if (event.getType() == AMContainerEventType.C_STOP_REQUEST) {
stopEvents.add((AMContainerEvent)event);
} else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
releaseEvents.add((RMCommunicatorContainerDeAllocateRequestEvent)event);
}
}
public void reset() {
this.launchRequests.clear();
this.assignEvents.clear();
this.stopEvents.clear();
}
}
// TODO Allow specifying the jobId as a parameter
@SuppressWarnings("rawtypes")
private AppContext setupDefaultTestContext(EventHandler eventHandler,
Configuration conf) {
AppContext appContext = mock(AppContext.class);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
JobID id = TypeConverter.fromYarn(appId);
JobId jobId = TypeConverter.toYarn(id);
Job mockJob = mock(Job.class);
when(mockJob.getID()).thenReturn(jobId);
when(mockJob.getProgress()).thenReturn(0.0f);
when(mockJob.getConf()).thenReturn(conf);
Clock clock = new ControlledClock(new SystemClock());
AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
amNodeMap.init(conf);
amNodeMap.start();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
appContext);
amContainerMap.init(conf);
amContainerMap.start();
when(appContext.getAllContainers()).thenReturn(amContainerMap);
when(appContext.getApplicationID()).thenReturn(appId);
when(appContext.getApplicationAttemptId()).thenReturn(appAttemptId);
when(appContext.getEventHandler()).thenReturn(eventHandler);
when(appContext.getJob(jobId)).thenReturn(mockJob);
when(appContext.getClock()).thenReturn(clock);
when(appContext.getAllNodes()).thenReturn(amNodeMap);
when(appContext.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
.newResource(10240)));
return appContext;
}
// TODO Add a unit test to verify a correct launchContainer invocation with
// security.
}