blob: 8fffe6f452c1f7676e712b3b03037d738c3e6d95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestTaskScheduler {
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@SuppressWarnings({ "unchecked" })
@Test
public void testTaskScheduler() throws Exception {
RackResolver.init(new YarnConfiguration());
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
AMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(AMRMClientAsync.class);
String appHost = "host";
int appPort = 0;
String appUrl = "url";
TaskSchedulerWithDrainableAppCallback scheduler = new TaskSchedulerWithDrainableAppCallback(
mockApp, appHost, appPort, appUrl, mockRMClient);
TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
.getDrainableAppCallback();
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
int interval = 100;
conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
scheduler.init(conf);
drainableAppCallback.drain();
verify(mockRMClient).init(conf);
verify(mockRMClient).setHeartbeatInterval(interval);
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = mock(Resource.class);
Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
when(mockRegResponse.getMaximumResourceCapability()).
thenReturn(mockMaxResource);
when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
when(mockRMClient.
registerApplicationMaster(anyString(), anyInt(), anyString())).
thenReturn(mockRegResponse);
scheduler.start();
drainableAppCallback.drain();
verify(mockRMClient).start();
verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
verify(mockApp).setApplicationRegistrationData(mockMaxResource,
mockAcls);
when(mockRMClient.getClusterNodeCount()).thenReturn(5);
Assert.assertEquals(5, scheduler.getClusterNodeCount());
Resource mockClusterResource = mock(Resource.class);
when(mockRMClient.getAvailableResources()).
thenReturn(mockClusterResource);
Assert.assertEquals(mockClusterResource,
mockRMClient.getAvailableResources());
Object mockTask1 = mock(Object.class);
Object mockCookie1 = mock(Object.class);
Resource mockCapability = mock(Resource.class);
String[] hosts = {"host1", "host5"};
String[] racks = {"/default-rack", "/default-rack"};
Priority mockPriority = mock(Priority.class);
ArgumentCaptor<CookieContainerRequest> requestCaptor =
ArgumentCaptor.forClass(CookieContainerRequest.class);
// allocate task
scheduler.allocateTask(mockTask1, mockCapability, hosts,
racks, mockPriority, null, mockCookie1);
drainableAppCallback.drain();
verify(mockRMClient, times(1)).
addContainerRequest((CookieContainerRequest) any());
// returned from task requests before allocation happens
assertFalse(scheduler.deallocateTask(mockTask1, true));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
verify(mockRMClient, times(0)).
releaseAssignedContainer((ContainerId) any());
// deallocating unknown task
assertFalse(scheduler.deallocateTask(mockTask1, true));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
verify(mockRMClient, times(0)).
releaseAssignedContainer((ContainerId) any());
// allocate tasks
Object mockTask2 = mock(Object.class);
Object mockCookie2 = mock(Object.class);
Object mockTask3 = mock(Object.class);
Object mockCookie3 = mock(Object.class);
scheduler.allocateTask(mockTask1, mockCapability, hosts,
racks, mockPriority, null, mockCookie1);
drainableAppCallback.drain();
verify(mockRMClient, times(2)).
addContainerRequest(requestCaptor.capture());
CookieContainerRequest request1 = requestCaptor.getValue();
scheduler.allocateTask(mockTask2, mockCapability, hosts,
racks, mockPriority, null, mockCookie2);
drainableAppCallback.drain();
verify(mockRMClient, times(3)).
addContainerRequest(requestCaptor.capture());
CookieContainerRequest request2 = requestCaptor.getValue();
scheduler.allocateTask(mockTask3, mockCapability, hosts,
racks, mockPriority, null, mockCookie3);
drainableAppCallback.drain();
verify(mockRMClient, times(4)).
addContainerRequest(requestCaptor.capture());
CookieContainerRequest request3 = requestCaptor.getValue();
List<Container> containers = new ArrayList<Container>();
Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
ContainerId mockCId1 = mock(ContainerId.class);
when(mockContainer1.getId()).thenReturn(mockCId1);
containers.add(mockContainer1);
Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
ContainerId mockCId2 = mock(ContainerId.class);
when(mockContainer2.getId()).thenReturn(mockCId2);
containers.add(mockContainer2);
Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
ContainerId mockCId3 = mock(ContainerId.class);
when(mockContainer3.getId()).thenReturn(mockCId3);
containers.add(mockContainer3);
Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
ContainerId mockCId4 = mock(ContainerId.class);
when(mockContainer4.getId()).thenReturn(mockCId4);
containers.add(mockContainer4);
ArrayList<CookieContainerRequest> hostContainers =
new ArrayList<CookieContainerRequest>();
hostContainers.add(request1);
hostContainers.add(request2);
hostContainers.add(request3);
ArrayList<CookieContainerRequest> rackContainers =
new ArrayList<CookieContainerRequest>();
rackContainers.add(request2);
rackContainers.add(request3);
ArrayList<CookieContainerRequest> anyContainers =
new ArrayList<CookieContainerRequest>();
anyContainers.add(request3);
final List<ArrayList<CookieContainerRequest>> hostList =
new LinkedList<ArrayList<CookieContainerRequest>>();
hostList.add(hostContainers);
final List<ArrayList<CookieContainerRequest>> rackList =
new LinkedList<ArrayList<CookieContainerRequest>>();
rackList.add(rackContainers);
final List<ArrayList<CookieContainerRequest>> anyList =
new LinkedList<ArrayList<CookieContainerRequest>>();
anyList.add(anyContainers);
final List<ArrayList<CookieContainerRequest>> emptyList =
new LinkedList<ArrayList<CookieContainerRequest>>();
// return all requests for host1
when(
mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
(Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return hostList;
}
});
// first request matched by host
// second request matched to rack. RackResolver by default puts hosts in
// /default-rack. We need to workaround by returning rack matches only once
when(
mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
(Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return rackList;
}
}).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return emptyList;
}
});
// third request matched to ANY
when(
mockRMClient.getMatchingRequests((Priority) any(),
eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return anyList;
}
}).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return emptyList;
}
});
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
// first container allocated
verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);
verify(mockApp).taskAllocated(mockTask2, mockCookie2, mockContainer2);
verify(mockApp).taskAllocated(mockTask3, mockCookie3, mockContainer3);
// no other allocations returned
verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
verify(mockRMClient).removeContainerRequest(request1);
verify(mockRMClient).removeContainerRequest(request2);
verify(mockRMClient).removeContainerRequest(request3);
// verify unwanted container released
verify(mockRMClient).releaseAssignedContainer(mockCId4);
// deallocate allocated task
assertTrue(scheduler.deallocateTask(mockTask1, true));
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
// deallocate allocated container
Assert.assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
verify(mockRMClient).releaseAssignedContainer(mockCId2);
verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
ContainerStatus mockStatus1 = mock(ContainerStatus.class);
when(mockStatus1.getContainerId()).thenReturn(mockCId1);
statuses.add(mockStatus1);
ContainerStatus mockStatus2 = mock(ContainerStatus.class);
when(mockStatus2.getContainerId()).thenReturn(mockCId2);
statuses.add(mockStatus2);
ContainerStatus mockStatus3 = mock(ContainerStatus.class);
when(mockStatus3.getContainerId()).thenReturn(mockCId3);
statuses.add(mockStatus3);
ContainerStatus mockStatus4 = mock(ContainerStatus.class);
when(mockStatus4.getContainerId()).thenReturn(mockCId4);
statuses.add(mockStatus4);
scheduler.onContainersCompleted(statuses);
drainableAppCallback.drain();
// released container status returned
verify(mockApp).containerCompleted(mockTask1, mockStatus1);
verify(mockApp).containerCompleted(mockTask2, mockStatus2);
// currently allocated container status returned and not released
verify(mockApp).containerCompleted(mockTask3, mockStatus3);
// no other statuses returned
verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
Assert.assertEquals(progress, scheduler.getProgress(), 0);
List<NodeReport> mockUpdatedNodes = mock(List.class);
scheduler.onNodesUpdated(mockUpdatedNodes);
drainableAppCallback.drain();
verify(mockApp).nodesUpdated(mockUpdatedNodes);
Exception mockException = mock(Exception.class);
scheduler.onError(mockException);
drainableAppCallback.drain();
verify(mockApp).onError(mockException);
scheduler.onShutdownRequest();
drainableAppCallback.drain();
verify(mockApp).appShutdownRequested();
String appMsg = "success";
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.stop();
drainableAppCallback.drain();
verify(mockRMClient).
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
appMsg, appUrl);
verify(mockRMClient).stop();
scheduler.close();
}
@SuppressWarnings("unchecked")
@Test
public void testTaskSchedulerPreemption() throws Exception {
RackResolver.init(new YarnConfiguration());
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
AMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(AMRMClientAsync.class);
String appHost = "host";
int appPort = 0;
String appUrl = "url";
TaskSchedulerWithDrainableAppCallback scheduler = new TaskSchedulerWithDrainableAppCallback(
mockApp, appHost, appPort, appUrl, mockRMClient);
TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
.getDrainableAppCallback();
Configuration conf = new Configuration();
scheduler.init(conf);
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
when(
mockRMClient.registerApplicationMaster(anyString(), anyInt(),
anyString())).thenReturn(mockRegResponse);
scheduler.start();
Resource totalResource = Resource.newInstance(4000, 4);
when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
// no preemption
scheduler.getProgress();
drainableAppCallback.drain();
Assert.assertEquals(totalResource, scheduler.getTotalResources());
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
// allocate task
Object mockTaskPri1 = mock(Object.class);
Object mockTaskPri2 = mock(Object.class);
Object mockTaskPri3 = mock(Object.class);
Object mockTaskPri3Wait = mock(Object.class);
Object mockTaskPri3Kill = mock(Object.class);
Priority pri1 = Priority.newInstance(1);
Priority pri2 = Priority.newInstance(2);
Priority pri3 = Priority.newInstance(3);
ArgumentCaptor<CookieContainerRequest> requestCaptor =
ArgumentCaptor.forClass(CookieContainerRequest.class);
final ArrayList<CookieContainerRequest> anyContainers =
new ArrayList<CookieContainerRequest>();
Resource taskAsk = Resource.newInstance(1024, 1);
scheduler.allocateTask(mockTaskPri1, taskAsk, null,
null, pri1, null, null);
drainableAppCallback.drain();
verify(mockRMClient, times(1)).
addContainerRequest(requestCaptor.capture());
anyContainers.add(requestCaptor.getValue());
scheduler.allocateTask(mockTaskPri3, taskAsk, null,
null, pri3, null, null);
drainableAppCallback.drain();
verify(mockRMClient, times(2)).
addContainerRequest(requestCaptor.capture());
anyContainers.add(requestCaptor.getValue());
scheduler.allocateTask(mockTaskPri3Kill, taskAsk, null,
null, pri3, null, null);
drainableAppCallback.drain();
verify(mockRMClient, times(3)).
addContainerRequest(requestCaptor.capture());
anyContainers.add(requestCaptor.getValue());
Resource freeResource = Resource.newInstance(500, 0);
when(mockRMClient.getAvailableResources()).thenReturn(freeResource);
scheduler.getProgress();
drainableAppCallback.drain();
Assert.assertEquals(totalResource, scheduler.getTotalResources());
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
final List<ArrayList<CookieContainerRequest>> anyList =
new LinkedList<ArrayList<CookieContainerRequest>>();
final List<ArrayList<CookieContainerRequest>> emptyList =
new LinkedList<ArrayList<CookieContainerRequest>>();
anyList.add(anyContainers);
List<Container> containers = new ArrayList<Container>();
Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
when(mockContainer1.getResource()).thenReturn(taskAsk);
when(mockContainer1.getPriority()).thenReturn(pri1);
ContainerId mockCId1 = mock(ContainerId.class);
when(mockContainer1.getId()).thenReturn(mockCId1);
containers.add(mockContainer1);
Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer2.getNodeId().getHost()).thenReturn("host1");
when(mockContainer2.getResource()).thenReturn(taskAsk);
when(mockContainer2.getPriority()).thenReturn(pri3);
ContainerId mockCId2 = mock(ContainerId.class);
when(mockContainer2.getId()).thenReturn(mockCId2);
containers.add(mockContainer2);
Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer3.getNodeId().getHost()).thenReturn("host1");
when(mockContainer3.getResource()).thenReturn(taskAsk);
when(mockContainer3.getPriority()).thenReturn(pri3);
ContainerId mockCId3 = mock(ContainerId.class);
when(mockContainer3.getId()).thenReturn(mockCId3);
containers.add(mockContainer3);
when(
mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
(Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return emptyList;
}
});
// RackResolver by default puts hosts in default-rack
when(
mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
(Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
return emptyList;
}
});
when(
mockRMClient.getMatchingRequests((Priority) any(),
eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
int calls = 0;
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
if(calls > 0) {
anyContainers.remove(0);
}
calls++;
return anyList;
}
});
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());
Assert.assertEquals(mockCId1,
scheduler.taskAllocations.get(mockTaskPri1).getId());
Assert.assertEquals(mockCId2,
scheduler.taskAllocations.get(mockTaskPri3).getId());
Assert.assertEquals(mockCId3,
scheduler.taskAllocations.get(mockTaskPri3Kill).getId());
// no preemption
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
scheduler.allocateTask(mockTaskPri3Wait, taskAsk, null,
null, pri3, null, null);
// no preemption
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
scheduler.allocateTask(mockTaskPri2, taskAsk, null,
null, pri2, null, null);
drainableAppCallback.drain();
// mockTaskPri3Kill gets preempted
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3);
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.stop();
drainableAppCallback.drain();
scheduler.close();
}
@SuppressWarnings("unchecked")
@Test
public void testLocalityMatching() throws Exception {
RackResolver.init(new Configuration());
TaskSchedulerAppCallback appClient = mock(TaskSchedulerAppCallback.class);
AMRMClientAsync<CookieContainerRequest> amrmClient = mock(AMRMClientAsync.class);
TaskSchedulerWithDrainableAppCallback taskScheduler = new TaskSchedulerWithDrainableAppCallback(
appClient, "host", 0, "", amrmClient);
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler
.getDrainableAppCallback();
taskScheduler.init(new Configuration());
RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = mock(Resource.class);
Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
mockMaxResource);
when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
when(amrmClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(mockRegResponse);
taskScheduler.start();
taskScheduler.serviceInit(new Configuration());
taskScheduler.serviceStart();
Resource resource = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(1);
String hostsTask1[] = { "host1" };
String hostsTask2[] = { "non-allocated-host" };
String defaultRack[] = { "/default-rack" };
String otherRack[] = { "/other-rack" };
Object mockTask1 = mock(Object.class);
CookieContainerRequest mockCookie1 = mock(CookieContainerRequest.class,
RETURNS_DEEP_STUBS);
when(mockCookie1.getCookie().getTask()).thenReturn(mockTask1);
Object mockTask2 = mock(Object.class);
CookieContainerRequest mockCookie2 = mock(CookieContainerRequest.class,
RETURNS_DEEP_STUBS);
when(mockCookie2.getCookie().getTask()).thenReturn(mockTask2);
Container containerHost1 = createContainer(1, "host1", resource, priority);
Container containerHost3 = createContainer(2, "host3", resource, priority);
List<Container> allocatedContainers = new LinkedList<Container>();
allocatedContainers.add(containerHost3);
allocatedContainers.add(containerHost1);
final Map<String, List<CookieContainerRequest>> matchingMap = new HashMap<String, List<CookieContainerRequest>>();
taskScheduler.allocateTask(mockTask1, resource, hostsTask1, defaultRack,
priority, null, mockCookie1);
drainableAppCallback.drain();
List<CookieContainerRequest> host1List = new ArrayList<CookieContainerRequest>();
host1List.add(mockCookie1);
List<CookieContainerRequest> defaultRackList = new ArrayList<CookieContainerRequest>();
defaultRackList.add(mockCookie1);
matchingMap.put(hostsTask1[0], host1List);
matchingMap.put(defaultRack[0], defaultRackList);
List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<TaskScheduler.CookieContainerRequest>();
nonAllocatedHostList.add(mockCookie2);
List<CookieContainerRequest> otherRackList = new ArrayList<TaskScheduler.CookieContainerRequest>();
otherRackList.add(mockCookie2);
taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack,
priority, null, mockCookie2);
drainableAppCallback.drain();
matchingMap.put(hostsTask2[0], nonAllocatedHostList);
matchingMap.put(otherRack[0], otherRackList);
List<CookieContainerRequest> anyList = new LinkedList<TaskScheduler.CookieContainerRequest>();
anyList.add(mockCookie1);
anyList.add(mockCookie2);
matchingMap.put(ResourceRequest.ANY, anyList);
final List<ArrayList<CookieContainerRequest>> emptyList = new LinkedList<ArrayList<CookieContainerRequest>>();
when(
amrmClient.getMatchingRequests((Priority) any(), anyString(),
(Resource) any())).thenAnswer(
new Answer<List<? extends Collection<CookieContainerRequest>>>() {
@Override
public List<? extends Collection<CookieContainerRequest>> answer(
InvocationOnMock invocation) throws Throwable {
String location = (String) invocation.getArguments()[1];
if (matchingMap.get(location) != null) {
CookieContainerRequest matched = matchingMap.get(location).get(0);
// Remove matched from matchingMap - assuming TaskScheduler will
// pick the first entry.
Iterator<Entry<String, List<CookieContainerRequest>>> iter = matchingMap
.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, List<CookieContainerRequest>> entry = iter.next();
if (entry.getValue().remove(matched)) {
if (entry.getValue().size() == 0) {
iter.remove();
}
}
}
return Collections.singletonList(Collections
.singletonList(matched));
} else {
return emptyList;
}
}
});
taskScheduler.onContainersAllocated(allocatedContainers);
drainableAppCallback.drain();
ArgumentCaptor<Object> taskCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> containerCaptor = ArgumentCaptor
.forClass(Container.class);
verify(appClient, times(2)).taskAllocated(taskCaptor.capture(), any(),
containerCaptor.capture());
// Expected containerHost1 allocated to task1 due to locality,
// containerHost3 allocated to task2.
List<Container> assignedContainers = containerCaptor.getAllValues();
int container1Pos = assignedContainers.indexOf(containerHost1);
assertTrue("Container: " + containerHost1 + " was not assigned",
container1Pos != -1);
assertEquals("Task 1 was not allocated to containerHost1", mockTask1,
taskCaptor.getAllValues().get(container1Pos));
int container2Pos = assignedContainers.indexOf(containerHost3);
assertTrue("Container: " + containerHost3 + " was not assigned",
container2Pos != -1);
assertEquals("Task 2 was not allocated to containerHost3", mockTask2,
taskCaptor.getAllValues().get(container2Pos));
AppFinalStatus finalStatus = new AppFinalStatus(
FinalApplicationStatus.SUCCEEDED, "", "");
when(appClient.getFinalAppStatus()).thenReturn(finalStatus);
taskScheduler.close();
}
private Container createContainer(int id, String host, Resource resource,
Priority priority) {
ContainerId containerID = ContainerId.newInstance(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
id);
NodeId nodeID = NodeId.newInstance(host, 0);
Container container = Container.newInstance(containerID, nodeID, host
+ ":0", resource, priority, null);
return container;
}
}