Tez-4: Integrate with Yarn's removal of AMResponse
git-svn-id: https://svn.apache.org/repos/asf/incubator/tez/trunk@1468619 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java b/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
index 139ec05..2a12752 100644
--- a/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
+++ b/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
@@ -36,7 +36,6 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,10 +78,10 @@
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
- AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- AMResponse response;
+
+ AllocateResponse response;
try {
- response = allocateResponse.getAMResponse();
+ response = scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (Exception e) {
diff --git a/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java b/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
index 4849cea..4f59300 100644
--- a/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
+++ b/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
@@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +57,8 @@
import org.apache.hadoop.yarn.util.Records;
+
+
/**
* Keeps the data structures to send container requests to RM.
*/
@@ -319,7 +320,7 @@
int headRoom = getAvailableResources() != null ? getAvailableResources()
.getMemory() : 0;// first time it would be null
int lastClusterNmCount = clusterNmCount;
- AMResponse response = errorCheckedMakeRemoteRequest();
+ AllocateResponse response = errorCheckedMakeRemoteRequest();
int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
.getMemory() : 0;
@@ -373,8 +374,8 @@
@SuppressWarnings("unchecked")
- protected AMResponse errorCheckedMakeRemoteRequest() throws Exception {
- AMResponse response = null;
+ protected AllocateResponse errorCheckedMakeRemoteRequest() throws Exception {
+ AllocateResponse response = null;
try {
response = makeRemoteRequest();
// Reset retry count if no exception occurred.
@@ -405,7 +406,7 @@
}
- protected AMResponse makeRemoteRequest() throws Exception {
+ protected AllocateResponse makeRemoteRequest() throws Exception {
List<ContainerId> clonedReleaseList = cloneAndClearReleaseList();
List<ResourceRequest> clonedAskList = cloneAndClearAskList();
@@ -419,20 +420,20 @@
rePopulateListsOnError(clonedReleaseList, clonedAskList);
throw e;
}
- AMResponse response = allocateResponse.getAMResponse();
- lastResponseID = response.getResponseId();
- availableResources = response.getAvailableResources();
+
+ lastResponseID = allocateResponse.getResponseId();
+ availableResources = allocateResponse.getAvailableResources();
clusterNmCount = allocateResponse.getNumClusterNodes();
if (clonedAskList.size() > 0 || clonedReleaseList.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ clonedAskList.size() + " release= " + clonedReleaseList.size()
- + " newContainers=" + response.getAllocatedContainers().size()
- + " finishedContainers="+ response.getCompletedContainersStatuses().size()
+ + " newContainers=" + allocateResponse.getAllocatedContainers().size()
+ + " finishedContainers="+ allocateResponse.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs=" + clusterNmCount);
}
- return response;
+ return allocateResponse;
}
@Override
diff --git a/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java b/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
index 507e369..95b6ce9 100644
--- a/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
+++ b/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
@@ -52,7 +52,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -301,10 +300,8 @@
}
}
- AMResponse amResponse = Records.newRecord(AMResponse.class);
- amResponse.setAllocatedContainers(containers);
- amResponse.setResponseId(request.getResponseId() + 1);
- response.setAMResponse(amResponse);
+ response.setAllocatedContainers(containers);
+ response.setResponseId(request.getResponseId() + 1);
response.setNumClusterNodes(350);
return response;
}
diff --git a/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java b/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
index c915e08..c5a14b4 100644
--- a/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
+++ b/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
@@ -47,7 +47,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -167,12 +166,10 @@
}
private AMRMProtocolForFailedAllocate createAMRMProtocolForFailedAllocate() {
- AMResponse amResponse =
- newAMResponse(new ArrayList<Container>(),
+ AllocateResponse allocateResponse =
+ newAllocateResponse(new ArrayList<Container>(),
BuilderUtils.newResource(1024, 1), new ArrayList<ContainerStatus>(),
- false, 1, new ArrayList<NodeReport>());
- AllocateResponse allocateResponse = newAllocateResponse(
- amResponse, 2);
+ false, 1, new ArrayList<NodeReport>(), 2);
return new AMRMProtocolForFailedAllocate(allocateResponse);
}
@@ -238,12 +235,10 @@
public AMRMProtocol createSchedulerProxy() {
if (amRmProtocol == null) {
amRmProtocol = mock(AMRMProtocol.class);
- AMResponse amResponse = newAMResponse(
+ AllocateResponse allocateResponse = newAllocateResponse(
new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
new ArrayList<ContainerStatus>(), false, 1,
- new ArrayList<NodeReport>());
- AllocateResponse allocateResponse = newAllocateResponse(
- amResponse, 2);
+ new ArrayList<NodeReport>(), 2);
try {
when(amRmProtocol.allocate(any(AllocateRequest.class))).thenReturn(allocateResponse);
} catch (YarnRemoteException e) {
@@ -288,12 +283,10 @@
public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException {
this.allocateRequest = request;
- AMResponse amResponse = newAMResponse(
+ AllocateResponse allocateResponse = newAllocateResponse(
new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
new ArrayList<ContainerStatus>(), false, 1,
- new ArrayList<NodeReport>());
- AllocateResponse allocateResponse = newAllocateResponse(
- amResponse, 2);
+ new ArrayList<NodeReport>(),2);
return allocateResponse;
}
}
@@ -343,24 +336,17 @@
return allocateRequest;
}
- public static AllocateResponse newAllocateResponse(AMResponse amResponse,
- int numNodes) {
- AllocateResponse response = Records.newRecord(AllocateResponse.class);
- response.setAMResponse(amResponse);
- response.setNumClusterNodes(numNodes);
- return response;
- }
-
- public static AMResponse newAMResponse(List<Container> allocated,
+ public static AllocateResponse newAllocateResponse(List<Container> allocated,
Resource available, List<ContainerStatus> completed, boolean reboot,
- int responseId, List<NodeReport> nodeUpdates) {
- AMResponse amResponse = Records.newRecord(AMResponse.class);
+ int responseId, List<NodeReport> nodeUpdates, int numNodes) {
+ AllocateResponse amResponse = Records.newRecord(AllocateResponse.class);
amResponse.setAllocatedContainers(allocated);
amResponse.setAvailableResources(available);
amResponse.setCompletedContainersStatuses(completed);
amResponse.setReboot(reboot);
amResponse.setResponseId(responseId);
amResponse.setUpdatedNodes(nodeUpdates);
+ amResponse.setNumClusterNodes(numNodes);
return amResponse;
}
}