TEZ-4447: Collect VertexStatus properly when DAGClientServer is not used (local mode without network) (#239) (Laszlo Bodor reviewed by Rajesh Balamohan)
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index 2ec6d28..26c11fd 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -145,7 +145,7 @@
+ ", applicationId=" + sessionAppId
+ ", dagId=" + dagId
+ ", dagName=" + dag.getName());
- return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+ return getDAGClient(sessionAppId, dagId, tezConf, ugi);
}
protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, Configuration conf,
@@ -186,4 +186,9 @@
UserGroupInformation ugi) throws TezException, IOException {
return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
}
+
+ public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
+ UserGroupInformation ugi) {
+ return new DAGClientImpl(appId, dagId, tezConf, this, ugi);
+ }
}
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index c37f0c1..93807fd 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -76,7 +76,6 @@
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
-import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import com.google.common.annotations.VisibleForTesting;
@@ -1117,15 +1116,7 @@
@Private
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient, UserGroupInformation ugi) throws IOException, TezException {
- return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient, ugi);
- }
-
- @Private // Used only for MapReduce compatibility code
- static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
- FrameworkClient frameworkClient) throws IOException, TezException {
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName());
- return getDAGClient(appId, tezConf, frameworkClient, ugi);
+ return frameworkClient.getDAGClient(appId, getDefaultTezDAGID(appId), tezConf, ugi);
}
// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index bfea96b..95dd85f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -296,9 +296,13 @@
}
@Override
- public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
- IOException, TezException {
+ public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
+ return getVertexStatusInternal(statusOptions, vertexName);
+ }
+ protected VertexStatus getVertexStatusInternal(Set<StatusGetOpts> statusOptions, String vertexName)
+ throws IOException, TezException {
if (!dagCompleted) {
VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions);
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
index a0509cd..851bb68 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
@@ -36,12 +36,15 @@
public class DAGClientImplLocal extends DAGClientImpl {
private BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction;
+ private BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction;
public DAGClientImplLocal(ApplicationId appId, String dagId, TezConfiguration conf,
FrameworkClient frameworkClient, UserGroupInformation ugi,
- BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction) {
+ BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction,
+ BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction) {
super(appId, dagId, conf, frameworkClient, ugi);
this.dagStatusFunction = dagStatusFunction;
+ this.vertexStatusFunction = vertexStatusFunction;
}
@Override
@@ -50,4 +53,10 @@
return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions,
timeout);
}
+
+ @Override
+ protected VertexStatus getVertexStatusInternal(@Nullable Set<StatusGetOpts> statusOptions, String vertexName)
+ throws TezException, IOException {
+ return vertexStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions, vertexName);
+ }
}
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index d0580bb..c9b3d73 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -60,6 +60,7 @@
import org.apache.tez.dag.api.client.DAGClientImplLocal;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.app.AppContext;
@@ -426,20 +427,32 @@
}
String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(), additionalResources);
+ return getDAGClient(sessionAppId, dagId, tezConf, ugi);
+ }
+ @Override
+ public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
+ UserGroupInformation ugi) {
return isLocalWithoutNetwork
- ? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this,
- ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
- @Override
- public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
- try {
- return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
- } catch (TezException e) {
- throw new RuntimeException(e);
- }
- }
- })
- : new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+ ? new DAGClientImplLocal(appId, dagId, tezConf, this, ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
+ @Override
+ public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
+ try {
+ return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
+ } catch (TezException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, new BiFunction<Set<StatusGetOpts>, String, VertexStatus>() {
+ @Override
+ public VertexStatus apply(Set<StatusGetOpts> statusOpts, String vertexName) {
+ try {
+ return clientHandler.getVertexStatus(dagId, vertexName, statusOpts);
+ } catch (TezException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }) : new DAGClientImpl(appId, dagId, tezConf, this, ugi);
}
@Override
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
index 86089e9..1057932 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
@@ -31,7 +31,6 @@
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.MRDAGClient;
@InterfaceAudience.Private
public class MRTezClient extends TezClient {
@@ -46,9 +45,4 @@
throws TezException, IOException {
return super.submitDAGApplication(appId, dag);
}
-
- public static MRDAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient)
- throws IOException, TezException {
- return new MRDAGClient(TezClient.getDAGClient(appId, tezConf, frameworkClient));
- }
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 9dba357..7aed4a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -639,7 +639,7 @@
tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts);
tezClient.start();
- tezClient.submitDAGApplication(appId, dag);
+ dagClient = new MRDAGClient(tezClient.submitDAGApplication(appId, dag));
tezClient.stop();
} catch (TezException e) {
throw new IOException(e);
@@ -702,9 +702,6 @@
String jobFile = MRApps.getJobFile(conf, user, jobID);
DAGStatus dagStatus;
try {
- if(dagClient == null) {
- dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf, null);
- }
dagStatus = dagClient.getDAGStatus(null);
return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
} catch (TezException e) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 3efcd21..7750a13 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -46,6 +46,7 @@
public class SleepProcessor extends AbstractLogicalIOProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
+ public static final String SLEEP_VERTEX_NAME = "Sleep";
private int timeToSleepMS;
protected Map<String, LogicalInput> inputs;
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index bdb71ad..00125fd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -39,6 +39,7 @@
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
@@ -131,6 +132,8 @@
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
+ assertEquals(VertexStatus.State.SUCCEEDED,
+ dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
dagClient1.close();
tezClient1.stop();
@@ -142,6 +145,8 @@
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
+ assertEquals(VertexStatus.State.SUCCEEDED,
+ dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
@@ -159,7 +164,8 @@
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
-
+ assertEquals(VertexStatus.State.SUCCEEDED,
+ dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
dagClient1.close();
tezClient1.stop();
@@ -171,6 +177,8 @@
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
+ assertEquals(VertexStatus.State.SUCCEEDED,
+ dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
@@ -189,7 +197,8 @@
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
-
+ assertEquals(VertexStatus.State.SUCCEEDED,
+ dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
@@ -210,7 +219,8 @@
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
-
+ assertEquals(VertexStatus.State.FAILED,
+ dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
@@ -245,12 +255,11 @@
}
private DAG createSimpleDAG(String dagName, String processorName) {
- DAG dag = DAG.create(dagName).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create(
- processorName).setUserPayload(
- new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
+ DAG dag = DAG.create(dagName).addVertex(Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor
+ .create(processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
return dag;
-
}
+
@Test(timeout=30000)
public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {
int dags = 2;//two dags will be submitted to session