TEZ-4447: Collect VertexStatus properly when DAGClientServer is not used (local mode without network) (#239) (Laszlo Bodor reviewed by Rajesh Balamohan)

diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index 2ec6d28..26c11fd 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -145,7 +145,7 @@
         + ", applicationId=" + sessionAppId
         + ", dagId=" + dagId
         + ", dagName=" + dag.getName());
-    return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+    return getDAGClient(sessionAppId, dagId, tezConf, ugi);
   }
 
   protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, Configuration conf,
@@ -186,4 +186,9 @@
       UserGroupInformation ugi) throws TezException, IOException {
     return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
   }
+
+  public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
+      UserGroupInformation ugi) {
+    return new DAGClientImpl(appId, dagId, tezConf, this, ugi);
+  }
 }
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index c37f0c1..93807fd 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -76,7 +76,6 @@
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
-import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -1117,15 +1116,7 @@
   @Private
   static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
       FrameworkClient frameworkClient, UserGroupInformation ugi) throws IOException, TezException {
-    return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient, ugi);
-  }
-
-  @Private // Used only for MapReduce compatibility code
-  static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
-      FrameworkClient frameworkClient) throws IOException, TezException {
-    UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName());
-    return getDAGClient(appId, tezConf, frameworkClient, ugi);
+    return frameworkClient.getDAGClient(appId, getDefaultTezDAGID(appId), tezConf, ugi);
   }
 
   // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index bfea96b..95dd85f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -296,9 +296,13 @@
   }
 
   @Override
-  public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
-      IOException, TezException {
+  public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
+    return getVertexStatusInternal(statusOptions, vertexName);
+  }
 
+  protected VertexStatus getVertexStatusInternal(Set<StatusGetOpts> statusOptions, String vertexName)
+      throws IOException, TezException {
     if (!dagCompleted) {
       VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions);
 
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
index a0509cd..851bb68 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
@@ -36,12 +36,15 @@
 public class DAGClientImplLocal extends DAGClientImpl {
 
   private BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction;
+  private BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction;
 
   public DAGClientImplLocal(ApplicationId appId, String dagId, TezConfiguration conf,
       FrameworkClient frameworkClient, UserGroupInformation ugi,
-      BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction) {
+      BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction,
+      BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction) {
     super(appId, dagId, conf, frameworkClient, ugi);
     this.dagStatusFunction = dagStatusFunction;
+    this.vertexStatusFunction = vertexStatusFunction;
   }
 
   @Override
@@ -50,4 +53,10 @@
     return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions,
         timeout);
   }
+
+  @Override
+  protected VertexStatus getVertexStatusInternal(@Nullable Set<StatusGetOpts> statusOptions, String vertexName)
+      throws TezException, IOException {
+    return vertexStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions, vertexName);
+  }
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index d0580bb..c9b3d73 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -60,6 +60,7 @@
 import org.apache.tez.dag.api.client.DAGClientImplLocal;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.app.AppContext;
@@ -426,20 +427,32 @@
     }
 
     String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(), additionalResources);
+    return getDAGClient(sessionAppId, dagId, tezConf, ugi);
+  }
 
+  @Override
+  public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
+      UserGroupInformation ugi) {
     return isLocalWithoutNetwork
-      ? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this,
-          ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
-            @Override
-            public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
-              try {
-                return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
-              } catch (TezException e) {
-                throw new RuntimeException(e);
-              }
-            }
-          })
-      : new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+      ? new DAGClientImplLocal(appId, dagId, tezConf, this, ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
+        @Override
+        public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
+          try {
+            return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
+          } catch (TezException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }, new BiFunction<Set<StatusGetOpts>, String, VertexStatus>() {
+        @Override
+        public VertexStatus apply(Set<StatusGetOpts> statusOpts, String vertexName) {
+          try {
+            return clientHandler.getVertexStatus(dagId, vertexName, statusOpts);
+          } catch (TezException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }) : new DAGClientImpl(appId, dagId, tezConf, this, ugi);
   }
 
   @Override
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
index 86089e9..1057932 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java
@@ -31,7 +31,6 @@
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.MRDAGClient;
 
 @InterfaceAudience.Private
 public class MRTezClient extends TezClient {
@@ -46,9 +45,4 @@
       throws TezException, IOException {
     return super.submitDAGApplication(appId, dag);
   }
-
-  public static MRDAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient)
-      throws IOException, TezException {
-    return new MRDAGClient(TezClient.getDAGClient(appId, tezConf, frameworkClient));
-  }
 }
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 9dba357..7aed4a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -639,7 +639,7 @@
       
       tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts);
       tezClient.start();
-      tezClient.submitDAGApplication(appId, dag);
+      dagClient = new MRDAGClient(tezClient.submitDAGApplication(appId, dag));
       tezClient.stop();
     } catch (TezException e) {
       throw new IOException(e);
@@ -702,9 +702,6 @@
     String jobFile = MRApps.getJobFile(conf, user, jobID);
     DAGStatus dagStatus;
     try {
-      if(dagClient == null) {
-        dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf, null);
-      }
       dagStatus = dagClient.getDAGStatus(null);
       return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
     } catch (TezException e) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 3efcd21..7750a13 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -46,6 +46,7 @@
 public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
+  public static final String SLEEP_VERTEX_NAME = "Sleep";
 
   private int timeToSleepMS;
   protected Map<String, LogicalInput> inputs;
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index bdb71ad..00125fd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -39,6 +39,7 @@
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -131,6 +132,8 @@
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
+    assertEquals(VertexStatus.State.SUCCEEDED,
+        dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
 
     dagClient1.close();
     tezClient1.stop();
@@ -142,6 +145,8 @@
     DAGClient dagClient2 = tezClient2.submitDAG(dag2);
     dagClient2.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
+    assertEquals(VertexStatus.State.SUCCEEDED,
+        dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
     assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
     dagClient2.close();
     tezClient2.stop();
@@ -159,7 +164,8 @@
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
-
+    assertEquals(VertexStatus.State.SUCCEEDED,
+        dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
     dagClient1.close();
     tezClient1.stop();
 
@@ -171,6 +177,8 @@
     DAGClient dagClient2 = tezClient2.submitDAG(dag2);
     dagClient2.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
+    assertEquals(VertexStatus.State.SUCCEEDED,
+        dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
     assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
     dagClient2.close();
     tezClient2.stop();
@@ -189,7 +197,8 @@
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
-
+    assertEquals(VertexStatus.State.SUCCEEDED,
+        dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
     // Sleep for more time than is required for the DAG to complete.
     Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
 
@@ -210,7 +219,8 @@
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
     assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
-
+    assertEquals(VertexStatus.State.FAILED,
+        dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
     // Sleep for more time than is required for the DAG to complete.
     Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
 
@@ -245,12 +255,11 @@
   }
 
   private DAG createSimpleDAG(String dagName, String processorName) {
-    DAG dag = DAG.create(dagName).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create(
-        processorName).setUserPayload(
-        new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
+    DAG dag = DAG.create(dagName).addVertex(Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor
+        .create(processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
     return dag;
-
   }
+
   @Test(timeout=30000)
   public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {
     int dags = 2;//two dags will be submitted to session