MAPREDUCE-6785. ContainerLauncherImpl support for reusing the containers.
Contributed by Naganarasimha G R.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
index 0f69fa8..41dfb03 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
@@ -28,6 +28,7 @@
private final Container container;
private final Map<ApplicationAccessType, String> applicationACLs;
+ private int shufflePort = -1;
public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
Container container, Map<ApplicationAccessType, String> applicationACLs) {
@@ -36,6 +37,14 @@
this.applicationACLs = applicationACLs;
}
+ public int getShufflePort() {
+ return shufflePort;
+ }
+
+ public void setShufflePort(int shufflePort) {
+ this.shufflePort = shufflePort;
+ }
+
public Container getContainer() {
return this.container;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 3943a3a..1f9ea90 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -265,7 +265,8 @@
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
- TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+ EnumSet.of(TaskAttemptStateInternal.ASSIGNED,
+ TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition())
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
@@ -1876,13 +1877,14 @@
}
private static class ContainerAssignedTransition implements
- SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
+ TaskAttemptStateInternal> {
@SuppressWarnings({ "unchecked" })
@Override
- public void transition(final TaskAttemptImpl taskAttempt,
- TaskAttemptEvent event) {
- final TaskAttemptContainerAssignedEvent cEvent =
- (TaskAttemptContainerAssignedEvent) event;
+ public TaskAttemptStateInternal transition(
+ final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+ final TaskAttemptContainerAssignedEvent cEvent =
+ (TaskAttemptContainerAssignedEvent) event;
Container container = cEvent.getContainer();
taskAttempt.container = container;
// this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1895,20 +1897,26 @@
taskAttempt.remoteTask, taskAttempt.jvmID);
taskAttempt.computeRackAndLocality();
-
- //launch the container
- //create the container object to be launched for a given Task attempt
- ContainerLaunchContext launchContext = createContainerLaunchContext(
- cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
- taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
- taskAttempt.taskAttemptListener, taskAttempt.credentials);
- taskAttempt.eventHandler
- .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
- launchContext, container, taskAttempt.remoteTask));
- // send event to speculator that our container needs are satisfied
- taskAttempt.eventHandler.handle
- (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
+ if (cEvent.getShufflePort() == -1) {
+ // launch the container
+ // create the container object to be launched for a given Task attempt
+ ContainerLaunchContext launchContext = createContainerLaunchContext(
+ cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
+ taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
+ taskAttempt.taskAttemptListener, taskAttempt.credentials);
+ taskAttempt.eventHandler
+ .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
+ launchContext, container, taskAttempt.remoteTask));
+
+ // send event to speculator that our container needs are satisfied
+ taskAttempt.eventHandler
+ .handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
+ return TaskAttemptStateInternal.ASSIGNED;
+ } else {
+ taskAttempt.onContainerLaunch(cEvent.getShufflePort());
+ return TaskAttemptStateInternal.RUNNING;
+ }
}
}
@@ -1982,7 +1990,6 @@
private static class LaunchedContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
- @SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent evnt) {
@@ -1990,34 +1997,34 @@
TaskAttemptContainerLaunchedEvent event =
(TaskAttemptContainerLaunchedEvent) evnt;
- //set the launch time
- taskAttempt.launchTime = taskAttempt.clock.getTime();
- taskAttempt.shufflePort = event.getShufflePort();
-
- // register it to TaskAttemptListener so that it can start monitoring it.
- taskAttempt.taskAttemptListener
- .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
-
- //TODO Resolve to host / IP in case of a local address.
- InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
- NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
- taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
- taskAttempt.httpPort = nodeHttpInetAddr.getPort();
- taskAttempt.sendLaunchedEvents();
- taskAttempt.eventHandler.handle
- (new SpeculatorEvent
- (taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
- //make remoteTask reference as null as it is no more needed
- //and free up the memory
- taskAttempt.remoteTask = null;
-
- //tell the Task that attempt has started
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId,
- TaskEventType.T_ATTEMPT_LAUNCHED));
+ taskAttempt.onContainerLaunch(event.getShufflePort());
}
}
-
+
+ @SuppressWarnings("unchecked")
+ private void onContainerLaunch(int shufflePortParam) {
+ // set the launch time
+ launchTime = clock.getTime();
+ this.shufflePort = shufflePortParam;
+
+ // register it to TaskAttemptListener so that it can start monitoring it.
+ taskAttemptListener.registerLaunchedTask(attemptId, jvmID);
+ // TODO Resolve to host / IP in case of a local address.
+ InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
+ NetUtils.createSocketAddr(container.getNodeHttpAddress());
+ trackerName = nodeHttpInetAddr.getHostName();
+ httpPort = nodeHttpInetAddr.getPort();
+ sendLaunchedEvents();
+ eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime()));
+ // make remoteTask reference as null as it is no more needed
+ // and free up the memory
+ remoteTask = null;
+
+ // tell the Task that attempt has started
+ eventHandler.handle(
+ new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED));
+ }
+
private static class CommitPendingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
index 2d54633..9842e0d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -38,6 +41,9 @@
void decContainerReq(ContainerRequest request);
+ void containerAssigned(Container allocated, ContainerRequest assigned,
+ Map<ApplicationAccessType, String> acls);
+
void release(ContainerId containerId);
boolean isNodeBlacklisted(String hostname);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 7d0b4b7..b6fd1fa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -53,7 +53,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -257,7 +256,7 @@
dispatcher.register(RMContainerReuseRequestor.EventType.class,
(RMContainerReuseRequestor) containerRequestor);
} else {
- containerRequestor = new RMContainerRequestor(this);
+ containerRequestor = new RMContainerRequestor(eventHandler, this);
}
containerRequestor.init(conf);
}
@@ -1298,11 +1297,8 @@
private void containerAssigned(Container allocated,
ContainerRequest assigned) {
// Update resource requests
- containerRequestor.decContainerReq(assigned);
-
- // send the container-assigned event to task attempt
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
- assigned.attemptID, allocated, applicationACLs));
+ containerRequestor.containerAssigned(allocated, assigned,
+ applicationACLs);
assignedRequests.add(allocated, assigned.attemptID);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 82ef24f..f68227b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -34,25 +34,29 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,11 +118,16 @@
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final ApplicationId applicationId;
private final RMCommunicator rmCommunicator;
+ @SuppressWarnings("rawtypes")
+ private EventHandler eventHandler;
- public RMContainerRequestor(RMCommunicator rmCommunicator) {
+ @SuppressWarnings("rawtypes")
+ public RMContainerRequestor(EventHandler eventHandler,
+ RMCommunicator rmCommunicator) {
super(RMContainerRequestor.class.getName());
this.rmCommunicator = rmCommunicator;
applicationId = rmCommunicator.applicationId;
+ this.eventHandler = eventHandler;
}
@Private
@@ -424,17 +433,28 @@
req.nodeLabelExpression);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void containerAssigned(Container allocated, ContainerRequest req,
+ Map<ApplicationAccessType, String> applicationACLs) {
+ decContainerReq(req);
+
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ req.attemptID, allocated, applicationACLs));
+ }
+
@Override
public void decContainerReq(ContainerRequest req) {
// Update resource requests
for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability);
}
-
+
for (String rack : req.racks) {
decResourceRequest(req.priority, rack, req.capability);
}
-
+
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
index 7559693..2bdfa91 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
@@ -34,7 +34,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -53,8 +55,8 @@
private static final Log LOG = LogFactory
.getLog(RMContainerReuseRequestor.class);
- private Map<Container, String> containersToReuse =
- new ConcurrentHashMap<Container, String>();
+ private Map<Container, HostInfo> containersToReuse =
+ new ConcurrentHashMap<>();
private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap =
new HashMap<ContainerId, List<TaskAttemptId>>();
private int containerReuseMaxMapTasks;
@@ -63,14 +65,17 @@
private int maxReduceTaskContainers;
private int noOfMapTaskContainersForReuse;
private int noOfReduceTaskContainersForReuse;
+ private final RMCommunicator rmCommunicator;
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
- private RMCommunicator rmCommunicator;
-
+ @SuppressWarnings("rawtypes")
public RMContainerReuseRequestor(
- EventHandler<ContainerAvailableEvent> eventHandler,
+ EventHandler eventHandler,
RMCommunicator rmCommunicator) {
- super(rmCommunicator);
+ super(eventHandler, rmCommunicator);
this.rmCommunicator = rmCommunicator;
+ this.eventHandler = eventHandler;
}
@Override
@@ -113,8 +118,8 @@
boolean blacklisted = super.isNodeBlacklisted(hostName);
if (blacklisted) {
Set<Container> containersOnHost = new HashSet<Container>();
- for (Entry<Container, String> elem : containersToReuse.entrySet()) {
- if (elem.getValue().equals(hostName)) {
+ for (Entry<Container, HostInfo> elem : containersToReuse.entrySet()) {
+ if (elem.getValue().getHost().equals(hostName)) {
containersOnHost.add(elem.getKey());
}
}
@@ -139,6 +144,7 @@
containerTaskAttempts = new ArrayList<TaskAttemptId>();
containerToTaskAttemptsMap.put(containerId, containerTaskAttempts);
}
+ TaskAttemptId taskAttemptId = event.getTaskAttemptId();
if (checkMapContainerReuseConstraints(priority, containerTaskAttempts)
|| checkReduceContainerReuseConstraints(priority,
containerTaskAttempts)) {
@@ -147,13 +153,17 @@
// If there are any eligible requests
if (resourceRequests != null && !resourceRequests.isEmpty()) {
canReuse = true;
- containerTaskAttempts.add(event.getTaskAttemptId());
+ containerTaskAttempts.add(taskAttemptId);
}
}
((RMContainerAllocator) rmCommunicator)
.resetContainerForReuse(container.getId());
if (canReuse) {
- containersToReuse.put(container, resourceName);
+ int shufflePort =
+ rmCommunicator.getJob().getTask(taskAttemptId.getTaskId())
+ .getAttempt(taskAttemptId).getShufflePort();
+ containersToReuse.put(container,
+ new HostInfo(resourceName, shufflePort));
incrementRunningReuseContainers(priority);
LOG.info("Adding the " + containerId + " for reuse.");
} else {
@@ -211,7 +221,7 @@
@Private
@VisibleForTesting
- Map<Container, String> getContainersToReuse() {
+ Map<Container, HostInfo> getContainersToReuse() {
return containersToReuse;
}
@@ -221,4 +231,34 @@
public static enum EventType {
CONTAINER_AVAILABLE
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void containerAssigned(Container allocated, ContainerRequest req,
+ Map<ApplicationAccessType, String> applicationACLs) {
+ if(containersToReuse.containsKey(allocated)){
+ decContainerReq(req);
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ req.attemptID, allocated, applicationACLs));
+ } else {
+ super.containerAssigned(allocated, req, applicationACLs);
+ }
+ }
+
+ static class HostInfo {
+ private String host;
+ private int port;
+ public HostInfo(String host, int port) {
+ super();
+ this.host = host;
+ this.port = port;
+ }
+ public String getHost() {
+ return host;
+ }
+ public int getPort() {
+ return port;
+ }
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
index d747e74..2ca7cc8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Map;
@@ -29,8 +31,12 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -51,8 +57,16 @@
@Before
public void setup() throws IOException {
+ RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+ Job job = mock(Job.class);
+ Task task = mock(Task.class);
+ TaskAttempt taskAttempt = mock(TaskAttempt.class);
+ when(taskAttempt.getShufflePort()).thenReturn(0);
+ when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt);
+ when(job.getTask(any(TaskId.class))).thenReturn(task);
+ when(allocator.getJob()).thenReturn(job);
reuseRequestor = new RMContainerReuseRequestor(null,
- mock(RMContainerAllocator.class));
+ allocator);
}
@Test
@@ -138,14 +152,14 @@
@Test
public void testContainerFailedOnHost() throws Exception {
reuseRequestor.serviceInit(new Configuration());
- Map<Container, String> containersToReuse = reuseRequestor
+ Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000008",
- RMContainerAllocator.PRIORITY_REDUCE), "node1");
+ RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999));
containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000009",
- RMContainerAllocator.PRIORITY_REDUCE), "node2");
+ RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999));
reuseRequestor.getBlacklistedNodes().add("node1");
// It removes all containers from containersToReuse running in node1
reuseRequestor.containerFailedOnHost("node1");
@@ -172,7 +186,7 @@
ContainerAvailableEvent event = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
reuseRequestor.handle(event);
- Map<Container, String> containersToReuse = reuseRequestor
+ Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container));
@@ -206,7 +220,7 @@
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container);
reuseRequestor.handle(event1);
- Map<Container, String> containersToReuse = reuseRequestor
+ Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
// It is reusing the container
Assert.assertTrue("Container should be added for reuse.",
@@ -236,7 +250,7 @@
ContainerAvailableEvent event1 = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
reuseRequestor.handle(event1);
- Map<Container, String> containersToReuse = reuseRequestor
+ Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container));
@@ -269,7 +283,7 @@
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container1);
reuseRequestor.handle(event1);
- Map<Container, String> containersToReuse = reuseRequestor
+ Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container1));