MAPREDUCE-6785. ContainerLauncherImpl support for reusing the containers.
Contributed by Naganarasimha G R.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
index 0f69fa8..41dfb03 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
@@ -28,6 +28,7 @@
 
   private final Container container;
   private final Map<ApplicationAccessType, String> applicationACLs;
+  private int shufflePort = -1;
 
   public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
       Container container, Map<ApplicationAccessType, String> applicationACLs) {
@@ -36,6 +37,14 @@
     this.applicationACLs = applicationACLs;
   }
 
+  public int getShufflePort() {
+    return shufflePort;
+  }
+
+  public void setShufflePort(int shufflePort) {
+    this.shufflePort = shufflePort;
+  }
+
   public Container getContainer() {
     return this.container;
   }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 3943a3a..1f9ea90 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -265,7 +265,8 @@
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
-         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+         EnumSet.of(TaskAttemptStateInternal.ASSIGNED,
+         TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED,
          new ContainerAssignedTransition())
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
@@ -1876,13 +1877,14 @@
   }
 
   private static class ContainerAssignedTransition implements
-      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
+      TaskAttemptStateInternal> {
     @SuppressWarnings({ "unchecked" })
     @Override
-    public void transition(final TaskAttemptImpl taskAttempt, 
-        TaskAttemptEvent event) {
-      final TaskAttemptContainerAssignedEvent cEvent = 
-        (TaskAttemptContainerAssignedEvent) event;
+    public TaskAttemptStateInternal transition(
+        final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      final TaskAttemptContainerAssignedEvent cEvent =
+          (TaskAttemptContainerAssignedEvent) event;
       Container container = cEvent.getContainer();
       taskAttempt.container = container;
       // this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1895,20 +1897,26 @@
           taskAttempt.remoteTask, taskAttempt.jvmID);
 
       taskAttempt.computeRackAndLocality();
-      
-      //launch the container
-      //create the container object to be launched for a given Task attempt
-      ContainerLaunchContext launchContext = createContainerLaunchContext(
-          cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
-          taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
-          taskAttempt.taskAttemptListener, taskAttempt.credentials);
-      taskAttempt.eventHandler
-        .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
-          launchContext, container, taskAttempt.remoteTask));
 
-      // send event to speculator that our container needs are satisfied
-      taskAttempt.eventHandler.handle
-          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
+      if (cEvent.getShufflePort() == -1) {
+        // launch the container
+        // create the container object to be launched for a given Task attempt
+        ContainerLaunchContext launchContext = createContainerLaunchContext(
+            cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
+            taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
+            taskAttempt.taskAttemptListener, taskAttempt.credentials);
+        taskAttempt.eventHandler
+            .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
+                launchContext, container, taskAttempt.remoteTask));
+
+        // send event to speculator that our container needs are satisfied
+        taskAttempt.eventHandler
+            .handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
+        return TaskAttemptStateInternal.ASSIGNED;
+      } else {
+        taskAttempt.onContainerLaunch(cEvent.getShufflePort());
+        return TaskAttemptStateInternal.RUNNING;
+      }
     }
   }
 
@@ -1982,7 +1990,6 @@
 
   private static class LaunchedContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent evnt) {
@@ -1990,34 +1997,34 @@
       TaskAttemptContainerLaunchedEvent event =
         (TaskAttemptContainerLaunchedEvent) evnt;
 
-      //set the launch time
-      taskAttempt.launchTime = taskAttempt.clock.getTime();
-      taskAttempt.shufflePort = event.getShufflePort();
-
-      // register it to TaskAttemptListener so that it can start monitoring it.
-      taskAttempt.taskAttemptListener
-        .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
-
-      //TODO Resolve to host / IP in case of a local address.
-      InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
-          NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
-      taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
-      taskAttempt.httpPort = nodeHttpInetAddr.getPort();
-      taskAttempt.sendLaunchedEvents();
-      taskAttempt.eventHandler.handle
-          (new SpeculatorEvent
-              (taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
-      //make remoteTask reference as null as it is no more needed
-      //and free up the memory
-      taskAttempt.remoteTask = null;
-      
-      //tell the Task that attempt has started
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, 
-         TaskEventType.T_ATTEMPT_LAUNCHED));
+      taskAttempt.onContainerLaunch(event.getShufflePort());
     }
   }
-   
+
+  @SuppressWarnings("unchecked")
+  private void onContainerLaunch(int shufflePortParam) {
+    // set the launch time
+    launchTime = clock.getTime();
+    this.shufflePort = shufflePortParam;
+
+    // register it to TaskAttemptListener so that it can start monitoring it.
+    taskAttemptListener.registerLaunchedTask(attemptId, jvmID);
+    // TODO Resolve to host / IP in case of a local address.
+    InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
+        NetUtils.createSocketAddr(container.getNodeHttpAddress());
+    trackerName = nodeHttpInetAddr.getHostName();
+    httpPort = nodeHttpInetAddr.getPort();
+    sendLaunchedEvents();
+    eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime()));
+    // make remoteTask reference as null as it is no more needed
+    // and free up the memory
+    remoteTask = null;
+
+    // tell the Task that attempt has started
+    eventHandler.handle(
+        new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED));
+  }
+
   private static class CommitPendingTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
index 2d54633..9842e0d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -38,6 +41,9 @@
 
   void decContainerReq(ContainerRequest request);
 
+  void containerAssigned(Container allocated, ContainerRequest assigned,
+      Map<ApplicationAccessType, String> acls);
+
   void release(ContainerId containerId);
 
   boolean isNodeBlacklisted(String hostname);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 7d0b4b7..b6fd1fa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -53,7 +53,6 @@
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -257,7 +256,7 @@
       dispatcher.register(RMContainerReuseRequestor.EventType.class,
           (RMContainerReuseRequestor) containerRequestor);
     } else {
-      containerRequestor = new RMContainerRequestor(this);
+      containerRequestor = new RMContainerRequestor(eventHandler, this);
     }
     containerRequestor.init(conf);
   }
@@ -1298,11 +1297,8 @@
     private void containerAssigned(Container allocated, 
                                     ContainerRequest assigned) {
       // Update resource requests
-      containerRequestor.decContainerReq(assigned);
-
-      // send the container-assigned event to task attempt
-      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-          assigned.attemptID, allocated, applicationACLs));
+      containerRequestor.containerAssigned(allocated, assigned,
+          applicationACLs);
 
       assignedRequests.add(allocated, assigned.attemptID);
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 82ef24f..f68227b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -34,25 +34,29 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,11 +118,16 @@
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
   private final ApplicationId applicationId;
   private final RMCommunicator rmCommunicator;
+  @SuppressWarnings("rawtypes")
+  private EventHandler eventHandler;
 
-  public RMContainerRequestor(RMCommunicator rmCommunicator) {
+  @SuppressWarnings("rawtypes")
+  public RMContainerRequestor(EventHandler eventHandler,
+      RMCommunicator rmCommunicator) {
     super(RMContainerRequestor.class.getName());
     this.rmCommunicator = rmCommunicator;
     applicationId = rmCommunicator.applicationId;
+    this.eventHandler = eventHandler;
   }
 
   @Private
@@ -424,17 +433,28 @@
         req.nodeLabelExpression);
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerAssigned(Container allocated, ContainerRequest req,
+      Map<ApplicationAccessType, String> applicationACLs) {
+    decContainerReq(req);
+
+    // send the container-assigned event to task attempt
+    eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+        req.attemptID, allocated, applicationACLs));
+  }
+
   @Override
   public void decContainerReq(ContainerRequest req) {
     // Update resource requests
     for (String hostName : req.hosts) {
       decResourceRequest(req.priority, hostName, req.capability);
     }
-    
+
     for (String rack : req.racks) {
       decResourceRequest(req.priority, rack, req.capability);
     }
-   
+
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
   }
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
index 7559693..2bdfa91 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java
@@ -34,7 +34,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -53,8 +55,8 @@
   private static final Log LOG = LogFactory
       .getLog(RMContainerReuseRequestor.class);
 
-  private Map<Container, String> containersToReuse =
-      new ConcurrentHashMap<Container, String>();
+  private Map<Container, HostInfo> containersToReuse =
+      new ConcurrentHashMap<>();
   private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap =
       new HashMap<ContainerId, List<TaskAttemptId>>();
   private int containerReuseMaxMapTasks;
@@ -63,14 +65,17 @@
   private int maxReduceTaskContainers;
   private int noOfMapTaskContainersForReuse;
   private int noOfReduceTaskContainersForReuse;
+  private final RMCommunicator rmCommunicator;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
 
-  private RMCommunicator rmCommunicator;
-
+  @SuppressWarnings("rawtypes")
   public RMContainerReuseRequestor(
-      EventHandler<ContainerAvailableEvent> eventHandler,
+      EventHandler eventHandler,
       RMCommunicator rmCommunicator) {
-    super(rmCommunicator);
+    super(eventHandler, rmCommunicator);
     this.rmCommunicator = rmCommunicator;
+    this.eventHandler = eventHandler;
   }
 
   @Override
@@ -113,8 +118,8 @@
     boolean blacklisted = super.isNodeBlacklisted(hostName);
     if (blacklisted) {
       Set<Container> containersOnHost = new HashSet<Container>();
-      for (Entry<Container, String> elem : containersToReuse.entrySet()) {
-        if (elem.getValue().equals(hostName)) {
+      for (Entry<Container, HostInfo> elem : containersToReuse.entrySet()) {
+        if (elem.getValue().getHost().equals(hostName)) {
           containersOnHost.add(elem.getKey());
         }
       }
@@ -139,6 +144,7 @@
         containerTaskAttempts = new ArrayList<TaskAttemptId>();
         containerToTaskAttemptsMap.put(containerId, containerTaskAttempts);
       }
+      TaskAttemptId taskAttemptId = event.getTaskAttemptId();
       if (checkMapContainerReuseConstraints(priority, containerTaskAttempts)
           || checkReduceContainerReuseConstraints(priority,
               containerTaskAttempts)) {
@@ -147,13 +153,17 @@
         // If there are any eligible requests
         if (resourceRequests != null && !resourceRequests.isEmpty()) {
           canReuse = true;
-          containerTaskAttempts.add(event.getTaskAttemptId());
+          containerTaskAttempts.add(taskAttemptId);
         }
       }
       ((RMContainerAllocator) rmCommunicator)
           .resetContainerForReuse(container.getId());
       if (canReuse) {
-        containersToReuse.put(container, resourceName);
+        int shufflePort =
+            rmCommunicator.getJob().getTask(taskAttemptId.getTaskId())
+                .getAttempt(taskAttemptId).getShufflePort();
+        containersToReuse.put(container,
+            new HostInfo(resourceName, shufflePort));
         incrementRunningReuseContainers(priority);
         LOG.info("Adding the " + containerId + " for reuse.");
       } else {
@@ -211,7 +221,7 @@
 
   @Private
   @VisibleForTesting
-  Map<Container, String> getContainersToReuse() {
+  Map<Container, HostInfo> getContainersToReuse() {
     return containersToReuse;
   }
 
@@ -221,4 +231,34 @@
   public static enum EventType {
     CONTAINER_AVAILABLE
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerAssigned(Container allocated, ContainerRequest req,
+      Map<ApplicationAccessType, String> applicationACLs) {
+    if(containersToReuse.containsKey(allocated)){
+      decContainerReq(req);
+      // send the container-assigned event to task attempt
+      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+          req.attemptID, allocated, applicationACLs));
+    } else {
+      super.containerAssigned(allocated, req, applicationACLs);
+    }
+  }
+
+  static class HostInfo {
+    private String host;
+    private int port;
+    public HostInfo(String host, int port) {
+      super();
+      this.host = host;
+      this.port = port;
+    }
+    public String getHost() {
+      return host;
+    }
+    public int getPort() {
+      return port;
+    }
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
index d747e74..2ca7cc8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Map;
@@ -29,8 +31,12 @@
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -51,8 +57,16 @@
 
   @Before
   public void setup() throws IOException {
+    RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+    Job job = mock(Job.class);
+    Task task = mock(Task.class);
+    TaskAttempt taskAttempt = mock(TaskAttempt.class);
+    when(taskAttempt.getShufflePort()).thenReturn(0);
+    when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt);
+    when(job.getTask(any(TaskId.class))).thenReturn(task);
+    when(allocator.getJob()).thenReturn(job);
     reuseRequestor = new RMContainerReuseRequestor(null,
-        mock(RMContainerAllocator.class));
+        allocator);
   }
 
   @Test
@@ -138,14 +152,14 @@
   @Test
   public void testContainerFailedOnHost() throws Exception {
     reuseRequestor.serviceInit(new Configuration());
-    Map<Container, String> containersToReuse = reuseRequestor
+    Map<Container, HostInfo> containersToReuse = reuseRequestor
         .getContainersToReuse();
     containersToReuse
         .put(newContainerInstance("container_1472171035081_0009_01_000008",
-            RMContainerAllocator.PRIORITY_REDUCE), "node1");
+            RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999));
     containersToReuse
         .put(newContainerInstance("container_1472171035081_0009_01_000009",
-            RMContainerAllocator.PRIORITY_REDUCE), "node2");
+            RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999));
     reuseRequestor.getBlacklistedNodes().add("node1");
     // It removes all containers from containersToReuse running in node1
     reuseRequestor.containerFailedOnHost("node1");
@@ -172,7 +186,7 @@
       ContainerAvailableEvent event = new ContainerAvailableEvent(
           EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
       reuseRequestor.handle(event);
-      Map<Container, String> containersToReuse = reuseRequestor
+      Map<Container, HostInfo> containersToReuse = reuseRequestor
           .getContainersToReuse();
       Assert.assertTrue("Container should be added for reuse.",
           containersToReuse.containsKey(container));
@@ -206,7 +220,7 @@
     ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
         taskAttemptId1, container);
     reuseRequestor.handle(event1);
-    Map<Container, String> containersToReuse = reuseRequestor
+    Map<Container, HostInfo> containersToReuse = reuseRequestor
         .getContainersToReuse();
     // It is reusing the container
     Assert.assertTrue("Container should be added for reuse.",
@@ -236,7 +250,7 @@
       ContainerAvailableEvent event1 = new ContainerAvailableEvent(
           EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
       reuseRequestor.handle(event1);
-      Map<Container, String> containersToReuse = reuseRequestor
+      Map<Container, HostInfo> containersToReuse = reuseRequestor
           .getContainersToReuse();
       Assert.assertTrue("Container should be added for reuse.",
           containersToReuse.containsKey(container));
@@ -269,7 +283,7 @@
     ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
         taskAttemptId1, container1);
     reuseRequestor.handle(event1);
-    Map<Container, String> containersToReuse = reuseRequestor
+    Map<Container, HostInfo> containersToReuse = reuseRequestor
         .getContainersToReuse();
     Assert.assertTrue("Container should be added for reuse.",
         containersToReuse.containsKey(container1));