PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1797708 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a55723..48f37bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@
  
 IMPROVEMENTS
 
+PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
+
 PIG-5251: Bump joda-time to 2.9.9 (dbist13 via rohini)
  
 OPTIMIZATIONS
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
index 9d2ffb3..fc26fac 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
@@ -18,6 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -65,6 +68,7 @@
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -107,6 +111,7 @@
 
     private Configuration conf;
     private PigHadoopLogger pigHadoopLogger;
+    private Object progressHelper;
 
     public static String sampleVertex;
     public static Map<String, Object> sampleMap;
@@ -203,6 +208,21 @@
 
     @Override
     public void close() throws Exception {
+        /*
+         * if (progressHelper != null) {
+         * progressHelper.shutDownProgressTaskService(); }
+         */
+        try {
+            if (progressHelper != null) {
+                Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper");
+                Method shutDownProgressTaskService = clazz.getMethod("shutDownProgressTaskService");
+                shutDownProgressTaskService.invoke(progressHelper);
+            }
+        }
+        catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException
+                | IllegalArgumentException | InvocationTargetException e) {
+            // ignore
+        }
         execPlan = null;
         fileOutputs = null;
         leaf = null;
@@ -221,6 +241,26 @@
     @Override
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
+        /*
+         * progressHelper = new ProgressHelper(inputs, getContext(),
+         * this.getClass().getSimpleName());
+         * progressHelper.scheduleProgressTaskService(100, Math.max(1000,
+         * conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+         * TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+         */
+        try {
+            Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper");
+            Constructor<?> ctor = clazz.getConstructor(Map.class, ProcessorContext.class, String.class);
+            progressHelper = ctor.newInstance(inputs, getContext(), this.getClass().getSimpleName());
+            Method scheduleProgressTaskService = clazz.getMethod("scheduleProgressTaskService", long.class, long.class);
+            scheduleProgressTaskService.invoke(progressHelper, 100,
+                    Math.max(1000, conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+                            TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+        }
+        catch (IllegalAccessException | IllegalArgumentException | InstantiationException | InvocationTargetException
+                | ClassNotFoundException | NoSuchMethodException | SecurityException e) {
+            // ignore
+        }
 
         try {
             initializeInputs(inputs);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
index d6a42eb..b604d9f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
@@ -95,8 +95,6 @@
         mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
         // TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8
         mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
-        // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
-        mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
         // TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez 0.8.4
         mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT, "tez.vertex.failures.maxpercent");
 
@@ -105,8 +103,17 @@
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
         mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
-        mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, "tez.vertex.failures.maxpercent");
+
+        try {
+            Class.forName("org.apache.tez.common.ProgressHelper");
+            // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
+            mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
+            mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
+        }
+        catch (ClassNotFoundException e) {
+            // Not translating before Tez 0.8.5 due to TEZ-3549
+        }
     }
 
     private static void populateMRSettingsToRetain() {