TEZ-4279: Add vertexId into vertex status for dag clients (#101) (Laszlo Bodor reviewed by Ashutosh Chauhan)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index acc5f12..5a2cb64 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -624,23 +624,20 @@
}
public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
- StatusGetOpts statusGetOpts) {
+ StatusGetOpts statusGetOpts) {
switch (statusGetOpts) {
- case GET_COUNTERS:
- return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
+ case GET_COUNTERS:
+ return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
}
- throw new TezUncheckedException("Could not convert StatusGetOpts to"
- + " proto");
+ throw new TezUncheckedException("Could not convert StatusGetOpts to" + " proto");
}
- public static StatusGetOpts convertStatusGetOptsFromProto(
- DAGProtos.StatusGetOptsProto proto) {
+ public static StatusGetOpts convertStatusGetOptsFromProto(DAGProtos.StatusGetOptsProto proto) {
switch (proto) {
- case GET_COUNTERS:
- return StatusGetOpts.GET_COUNTERS;
+ case GET_COUNTERS:
+ return StatusGetOpts.GET_COUNTERS;
}
- throw new TezUncheckedException("Could not convert to StatusGetOpts from"
- + " proto");
+ throw new TezUncheckedException("Could not convert to StatusGetOpts from" + " proto");
}
public static List<DAGProtos.StatusGetOptsProto> convertStatusGetOptsToProto(
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index dfb9bbe..9efb12d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -59,6 +59,10 @@
this.proxy = proxy;
}
+ public String getId() {
+ return proxy.getId();
+ }
+
public State getState() {
return getState(proxy.getState());
}
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 34c369d..4c8c7f6 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -245,10 +245,11 @@
}
message VertexStatusProto {
- optional VertexStatusStateProto state = 1;
- repeated string diagnostics = 2;
- optional ProgressProto progress = 3;
- optional TezCountersProto vertexCounters = 4;
+ required string id = 1;
+ optional VertexStatusStateProto state = 2;
+ repeated string diagnostics = 3;
+ optional ProgressProto progress = 4;
+ optional TezCountersProto vertexCounters = 5;
}
enum DAGStatusStateProto {
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 265fce9..edb7fd8 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -34,8 +34,10 @@
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.StatusGetOptsProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
@@ -235,6 +237,23 @@
assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
}
+ /*
+ * This unit test can catch if a StatusGetOpts <-> StatusGetOptsProto value is not defined at any
+ * side.
+ */
+ @Test
+ public void testConvertStatusGetOptsToProtoCoverage() {
+ StatusGetOpts[] opts = StatusGetOpts.values();
+ for (StatusGetOpts opt : opts) {
+ DagTypeConverters.convertStatusGetOptsToProto(opt);
+ }
+
+ StatusGetOptsProto[] optProtos = StatusGetOptsProto.values();
+ for (StatusGetOptsProto proto : optProtos) {
+ DagTypeConverters.convertStatusGetOptsFromProto(proto);
+ }
+ }
+
private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) {
assertEquals(dagAccessControls.getUsersWithViewACLs(),
Sets.newHashSet(aclInfo.getUsersWithViewAccessList()));
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 6a5e817..50c9a60 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -152,6 +152,7 @@
.build();
vertexStatusProtoWithoutCounters = VertexStatusProto.newBuilder()
+ .setId("vertex_1")
.addDiagnostics("V_Diagnostics_0")
.setProgress(vertexProgressProto)
.setState(VertexStatusStateProto.VERTEX_SUCCEEDED) // make sure the waitForCompletion be able to finish
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 4de321c..0304fc9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -28,6 +28,7 @@
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezVertexID;
public class VertexStatusBuilder extends VertexStatus {
@@ -35,6 +36,10 @@
super(VertexStatusProto.newBuilder());
}
+ public void setId(TezVertexID vertexId) {
+ getBuilder().setId(vertexId.toString());
+ }
+
public void setState(VertexState state) {
getBuilder().setState(getProtoState(state));
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e21add0..452dae5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1536,6 +1536,7 @@
this.readLock.lock();
try {
VertexStatusBuilder status = new VertexStatusBuilder();
+ status.setId(getVertexId());
status.setState(getInternalState());
status.setDiagnostics(diagnostics);
status.setProgress(getVertexProgress());