blob: 2ca7cc812cb319c4ab2dbc136d52442f320e6bf8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for RMContainerReuseRequestor.
*/
public class TestRMContainerReuseRequestor {
private RMContainerReuseRequestor reuseRequestor;
@Before
public void setup() throws IOException {
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
Job job = mock(Job.class);
Task task = mock(Task.class);
TaskAttempt taskAttempt = mock(TaskAttempt.class);
when(taskAttempt.getShufflePort()).thenReturn(0);
when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt);
when(job.getTask(any(TaskId.class))).thenReturn(task);
when(allocator.getJob()).thenReturn(job);
reuseRequestor = new RMContainerReuseRequestor(null,
allocator);
}
@Test
public void testNoOfTimesEachMapTaskContainerCanReuseWithDefaultConfig() {
// Verify that no of times each map task container can be reused with
// default configuration for
// 'yarn.app.mapreduce.am.container.reuse.max-maptasks'.
testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.MAP,
RMContainerAllocator.PRIORITY_MAP);
}
@Test
public void testNoOfTimesEachMapTaskContainerCanReuseWithConfigLimit() {
// Verify that no of times each map task container can be reused when
// 'yarn.app.mapreduce.am.container.reuse.max-maptasks' configured with a
// value.
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, 1);
testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.MAP,
RMContainerAllocator.PRIORITY_MAP, conf);
}
@Test
public void testNoOfTimesEachRedTaskContainerCanReuseWithDefaultConfig() {
// Verify that no of times each reduce task container can be reused with
// default configuration for
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.REDUCE,
RMContainerAllocator.PRIORITY_REDUCE);
}
@Test
public void testNoOfTimesEachRedTaskContainerCanReuseWithConfigLimit() {
// Verify that no of times each map task container can be reused when
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured with a
// value.
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, 1);
testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.REDUCE,
RMContainerAllocator.PRIORITY_REDUCE, conf);
}
@Test
public void testNoOfMaxMapTaskContainersCanReuseWithDefaultConfig() {
// Verify that no of maximum map containers can be reused at any time with
// default configuration for
// 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers'.
testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.MAP,
RMContainerAllocator.PRIORITY_MAP);
}
@Test
public void testNoOfMaxMapTaskContainersCanReuseWithConfigLimit() {
// Verify that no of maximum map containers can be reused at any time when
// 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers' configured
// with a limit value.
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, 1);
testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.MAP,
RMContainerAllocator.PRIORITY_MAP, conf);
}
@Test
public void testNoOfMaxRedTaskContainersCanReuseWithDefaultConfig() {
// Verify that no of maximum reduce containers can be reused at any time
// with default configuration for
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.REDUCE,
RMContainerAllocator.PRIORITY_REDUCE);
}
@Test
public void testNoOfMaxRedTaskContainersCanReuseWithConfigLimit() {
// Verify that no of maximum reduce containers can be reused at any time
// when 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured
// with a limit value.
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, 1);
testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.REDUCE,
RMContainerAllocator.PRIORITY_REDUCE, conf);
}
@Test
public void testContainerFailedOnHost() throws Exception {
reuseRequestor.serviceInit(new Configuration());
Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000008",
RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999));
containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000009",
RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999));
reuseRequestor.getBlacklistedNodes().add("node1");
// It removes all containers from containersToReuse running in node1
reuseRequestor.containerFailedOnHost("node1");
Assert.assertFalse("node1 should not present in reuse containers.",
containersToReuse.containsValue("node1"));
// There will not any change to containersToReuse when there are no
// containers to reuse in that node
reuseRequestor.containerFailedOnHost("node3");
Assert.assertEquals(1, containersToReuse.size());
}
private void testNoOfTimesEachContainerCanReuseWithDefaultConfig(
TaskType taskType, Priority priority) {
// Verify that no of times each container can be reused
// Add 10 container reqs to the requestor
addContainerReqs(priority);
Container container = newContainerInstance(
"container_123456789_0001_01_000002", priority);
for (int i = 0; i < 10; i++) {
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 1);
ContainerAvailableEvent event = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
reuseRequestor.handle(event);
Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container));
}
}
private void testNoOfTimesEachContainerCanReuseWithConfigLimit(
TaskType taskType, Priority priority, Configuration conf) {
reuseRequestor.init(conf);
// Add a container request
ContainerRequest req1 = new ContainerRequest(null,
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
null);
reuseRequestor.addContainerReq(req1);
// Add an another container request
ContainerRequest req2 = new ContainerRequest(null,
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
null);
reuseRequestor.addContainerReq(req2);
EventType eventType = EventType.CONTAINER_AVAILABLE;
Container container = newContainerInstance(
"container_123456789_0001_01_000002", priority);
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container);
reuseRequestor.handle(event1);
Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
// It is reusing the container
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container));
containersToReuse.clear();
ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
taskAttemptId2, container);
reuseRequestor.handle(event2);
// It should not be reused since it has already reused and limit value is 1.
Assert.assertFalse("Container should not be added for reuse.",
containersToReuse.containsKey(container));
}
private void testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType taskType,
Priority priority) {
// It tests no of times each container can be reused
// Add 10 container reqs to the requestor
addContainerReqs(priority);
for (int i = 0; i < 10; i++) {
Container container = newContainerInstance(
"container_123456789_0001_01_00000" + (i + 2), priority);
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1,
1);
ContainerAvailableEvent event1 = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
reuseRequestor.handle(event1);
Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container));
}
}
private void testNoOfMaxContainersCanReuseWithConfigLimit(TaskType taskType,
Priority priority, Configuration conf) {
reuseRequestor.init(conf);
ContainerRequest req1 = new ContainerRequest(null,
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
null);
reuseRequestor.addContainerReq(req1);
ContainerRequest req2 = new ContainerRequest(null,
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
null);
reuseRequestor.addContainerReq(req2);
EventType eventType = EventType.CONTAINER_AVAILABLE;
Container container1 = newContainerInstance(
"container_123456789_0001_01_000002", priority);
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container1);
reuseRequestor.handle(event1);
Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container1));
containersToReuse.clear();
Container container2 = newContainerInstance(
"container_123456789_0001_01_000003", priority);
ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
taskAttemptId2, container2);
reuseRequestor.handle(event2);
Assert.assertFalse("Container should not be added for reuse.",
containersToReuse.containsKey(container2));
}
private void addContainerReqs(Priority priority) {
Configuration conf = new Configuration();
reuseRequestor.init(conf);
for (int i = 0; i < 10; i++) {
ContainerRequest req = new ContainerRequest(null,
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
null);
reuseRequestor.addContainerReq(req);
}
}
private Container newContainerInstance(String containerId,
Priority priority) {
return Container.newInstance(ContainerId.fromString(containerId),
NodeId.newInstance("node1", 8080), "", null, priority, null);
}
@After
public void tearDown() {
reuseRequestor.stop();
}
}