TEZ-2868. Fix setting Caller Context in Tez Examples. (hitesh)
diff --git a/CHANGES.txt b/CHANGES.txt
index 58b574c..8cd6400 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2868. Fix setting Caller Context in Tez Examples.
TEZ-2860. NPE in DAGClientImpl.
TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
@@ -198,6 +199,7 @@
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2868. Fix setting Caller Context in Tez Examples.
TEZ-2860. NPE in DAGClientImpl.
TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 68b6d52..e4fdc18 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -37,9 +37,7 @@
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.records.DAGProtos;
-import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
-import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index c88c833..5922100 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -147,6 +147,7 @@
if (appId != null) {
callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
}
+ dag.setCallerContext(callerContext);
DAGClient dagClient = tezClientInternal.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 6966e8d..3188c6e 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -277,7 +277,6 @@
vertices.add(finalReduceVertex);
DAG dag = DAG.create("OrderedWordCount" + dagIndex);
- dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount Job"));
for (int i = 0; i < vertices.size(); ++i) {
dag.addVertex(vertices.get(i));
}
@@ -447,6 +446,12 @@
DAG dag = instance.createDAG(fs, tezConf, localResources,
stagingDir, dagIndex, inputPath, outputPath,
generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
+ String callerType = "TestOrderedWordCount";
+ String callerId = tezSession.getAppMasterApplicationId() == null ?
+ ( "UnknownApp_" + System.currentTimeMillis() + dagIndex ) :
+ ( tezSession.getAppMasterApplicationId().toString() + "_" + dagIndex);
+ dag.setCallerContext(CallerContext.create("Tez", callerId, callerType,
+ "TestOrderedWordCount Job"));
boolean doPreWarm = dagIndex == 1 && useTezSession
&& conf.getBoolean("PRE_WARM_SESSION", true);