PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1797708 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a55723..48f37bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
+
PIG-5251: Bump joda-time to 2.9.9 (dbist13 via rohini)
OPTIMIZATIONS
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
index 9d2ffb3..fc26fac 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
@@ -18,6 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -65,6 +68,7 @@
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.output.MROutput;
@@ -107,6 +111,7 @@
private Configuration conf;
private PigHadoopLogger pigHadoopLogger;
+ private Object progressHelper;
public static String sampleVertex;
public static Map<String, Object> sampleMap;
@@ -203,6 +208,21 @@
@Override
public void close() throws Exception {
+ /*
+ * if (progressHelper != null) {
+ * progressHelper.shutDownProgressTaskService(); }
+ */
+ try {
+ if (progressHelper != null) {
+ Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper");
+ Method shutDownProgressTaskService = clazz.getMethod("shutDownProgressTaskService");
+ shutDownProgressTaskService.invoke(progressHelper);
+ }
+ }
+ catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ // ignore
+ }
execPlan = null;
fileOutputs = null;
leaf = null;
@@ -221,6 +241,26 @@
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
+ /*
+ * progressHelper = new ProgressHelper(inputs, getContext(),
+ * this.getClass().getSimpleName());
+ * progressHelper.scheduleProgressTaskService(100, Math.max(1000,
+ * conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ * TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+ */
+ try {
+ Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper");
+ Constructor<?> ctor = clazz.getConstructor(Map.class, ProcessorContext.class, String.class);
+ progressHelper = ctor.newInstance(inputs, getContext(), this.getClass().getSimpleName());
+ Method scheduleProgressTaskService = clazz.getMethod("scheduleProgressTaskService", long.class, long.class);
+ scheduleProgressTaskService.invoke(progressHelper, 100,
+ Math.max(1000, conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+ }
+ catch (IllegalAccessException | IllegalArgumentException | InstantiationException | InvocationTargetException
+ | ClassNotFoundException | NoSuchMethodException | SecurityException e) {
+ // ignore
+ }
try {
initializeInputs(inputs);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
index d6a42eb..b604d9f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
@@ -95,8 +95,6 @@
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
// TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8
mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
- // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
- mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
// TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez 0.8.4
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT, "tez.vertex.failures.maxpercent");
@@ -105,8 +103,17 @@
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
- mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, "tez.vertex.failures.maxpercent");
+
+ try {
+ Class.forName("org.apache.tez.common.ProgressHelper");
+ // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
+ }
+ catch (ClassNotFoundException e) {
+ // Not translating before Tez 0.8.5 due to TEZ-3549
+ }
}
private static void populateMRSettingsToRetain() {