PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch
git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.16@1782110 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 3af5a40..819eab3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
BUG FIXES
+PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch (daijy)
+
PIG-5119: SkewedJoin_15 is unstable (daijy)
PIG-5118: Script fails with Invalid dag containing 0 vertices (rohini)
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
index 515ce66..ff3d1c6 100644
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
+++ b/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
@@ -204,4 +204,14 @@
public static boolean isHadoopYARN() {
return false;
}
+
+ /**
+ * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+ *
+ * @param hook code to execute during shutdown
+ * @param priority ignored in Hadoop 1
+ */
+ public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+ Runtime.getRuntime().addShutdownHook(new Thread(hook));
+ }
}
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
index 31ceeb2..70eb3a7 100644
--- a/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
+++ b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -67,14 +68,14 @@
timelineClient.init(yarnConf);
timelineClient.start();
}
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
timelineClient.stop();
executor.shutdownNow();
executor = null;
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
log.info("Created ATS Hook");
}
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java b/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
index 8fbf33f..1aafeac 100644
--- a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
+++ b/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -242,4 +243,15 @@
public static boolean isHadoopYARN() {
return true;
}
+
+ /**
+ * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+ *
+ * @param hook code to execute during shutdown
+ * @param priority Priority over the FileSystem.SHUTDOWN_HOOK_PRIORITY
+ */
+ public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+ ShutdownHookManager.get().addShutdownHook(hook,
+ FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+ }
}
diff --git a/src/org/apache/pig/Main.java b/src/org/apache/pig/Main.java
index 3ced58b..0f938b4 100644
--- a/src/org/apache/pig/Main.java
+++ b/src/org/apache/pig/Main.java
@@ -59,8 +59,10 @@
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.PigContext;
@@ -100,13 +102,12 @@
public class Main {
static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
-
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
FileLocalizer.deleteTempResourceFiles();
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
}
private final static Log log = LogFactory.getLog(Main.class);
@@ -660,6 +661,7 @@
if(!gruntCalled) {
LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
}
+ killRunningJobsIfInterrupted(e, pigContext);
} catch (Throwable e) {
rc = ReturnCode.THROWABLE_EXCEPTION;
PigStatsUtil.setErrorMessage(e.getMessage());
@@ -668,6 +670,7 @@
if(!gruntCalled) {
LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
}
+ killRunningJobsIfInterrupted(e, pigContext);
} finally {
if (printScriptRunTime) {
printScriptRunTime(startTime);
@@ -694,6 +697,22 @@
+ " (" + duration.getMillis() + " ms)");
}
+ private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) {
+ Throwable cause = e.getCause();
+ // Kill running job when we get InterruptedException
+ // Pig thread is interrupted by mapreduce when Oozie launcher job is killed
+ // Shutdown hook kills running jobs, but sometimes NodeManager can issue
+ // a SIGKILL after AM unregisters and before shutdown hook gets to execute
+ // causing orphaned jobs that continue to run.
+ if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) {
+ try {
+ pigContext.getExecutionEngine().kill();
+ } catch (BackendException be) {
+ log.error("Error while killing running jobs", be);
+ }
+ }
+ }
+
protected static PigProgressNotificationListener makeListener(Properties properties) {
try {
diff --git a/src/org/apache/pig/backend/executionengine/ExecutionEngine.java b/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
index 558c252..7e1da87 100644
--- a/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
+++ b/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
@@ -183,6 +183,14 @@
public ExecutableManager getExecutableManager();
/**
+ * This method is called when user requests to kill all jobs
+ * associated with the execution engine
+ *
+ * @throws BackendException
+ */
+ public void kill() throws BackendException;
+
+ /**
* This method is called when a user requests to kill a job associated with
* the given job id. If it is not possible for a user to kill a job, throw a
* exception. It is imperative for the job id's being displayed to be unique
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
index b0119f8..a594ac3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
@@ -378,6 +378,13 @@
}
@Override
+ public void kill() throws BackendException {
+ if (launcher != null) {
+ launcher.kill();
+ }
+ }
+
+ @Override
public void killJob(String jobID) throws BackendException {
if (launcher != null) {
launcher.killJob(jobID, getJobConf());
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java b/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
index 35ff893..cc85991 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
@@ -76,7 +76,7 @@
protected Map<FileSpec, Exception> failureMap;
protected JobControl jc = null;
- class HangingJobKiller extends Thread {
+ protected class HangingJobKiller extends Thread {
public HangingJobKiller() {}
@Override
@@ -90,7 +90,6 @@
}
protected Launcher() {
- Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
// handle the windows portion of \r
if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
newLine = "\r\n";
@@ -104,7 +103,6 @@
public void reset() {
failureMap = Maps.newHashMap();
totalHadoopTimeSpent = 0;
- jc = null;
}
/**
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 595e68c..6a666ae 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -19,7 +19,9 @@
import java.io.IOException;
import java.io.PrintStream;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -65,6 +67,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -86,7 +89,7 @@
* Main class that launches pig for Map Reduce
*
*/
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -94,14 +97,23 @@
private boolean aggregateWarning = false;
+ public MapReduceLauncher() {
+ super();
+ HadoopShims.addShutdownHookWithPriority(new HangingJobKiller(),
+ PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+ }
+
@Override
public void kill() {
try {
- log.debug("Receive kill signal");
- if (jc!=null) {
+ if (jc != null && jc.getRunningJobs().size() > 0) {
+ log.info("Received kill signal");
for (Job job : jc.getRunningJobs()) {
HadoopShims.killJob(job);
log.info("Job " + job.getAssignedJobID() + " killed");
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
@@ -301,8 +313,7 @@
// Now wait, till we are finished.
while(!jc.allFinished()){
- try { jcThread.join(sleepTime); }
- catch (InterruptedException e) {}
+ jcThread.join(sleepTime);
List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
index 7532dbd..17704c4 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
@@ -18,7 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,9 +32,11 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.client.TezAppMasterStatus;
@@ -47,13 +51,13 @@
private static final Log log = LogFactory.getLog(TezSessionManager.class);
static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
TezSessionManager.shutdown();
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
}
private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -273,6 +277,11 @@
synchronized (sessionInfo) {
if (sessionInfo.session == session) {
log.info("Stopping Tez session " + session);
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Shutting down Tez session "
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
session.stop();
sessionToRemove = sessionInfo;
break;
@@ -299,19 +308,30 @@
shutdown = true;
for (SessionInfo sessionInfo : sessionPool) {
synchronized (sessionInfo) {
+ TezClient session = sessionInfo.session;
try {
- if (sessionInfo.session.getAppMasterStatus().equals(
+ String timeStamp = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
+ if (session.getAppMasterStatus().equals(
TezAppMasterStatus.SHUTDOWN)) {
log.info("Tez session is already shutdown "
- + sessionInfo.session);
+ + session);
+ System.err.println(timeStamp
+ + " Tez session is already shutdown " + session
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
continue;
}
- log.info("Shutting down Tez session "
- + sessionInfo.session);
- sessionInfo.session.stop();
+ log.info("Shutting down Tez session " + session);
+ // Since hadoop calls org.apache.log4j.LogManager.shutdown();
+ // the log.info message is not displayed with shutdown hook in Oozie
+ System.err.println(timeStamp + " Shutting down Tez session "
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
+ session.stop();
} catch (Exception e) {
log.error("Error shutting down Tez session "
- + sessionInfo.session, e);
+ + session, e);
}
}
}
diff --git a/src/org/apache/pig/impl/PigImplConstants.java b/src/org/apache/pig/impl/PigImplConstants.java
index c665b42..4fc3f85 100644
--- a/src/org/apache/pig/impl/PigImplConstants.java
+++ b/src/org/apache/pig/impl/PigImplConstants.java
@@ -84,4 +84,9 @@
* A unique id for a Pig session used as callerId for underlining component
*/
public static final String PIG_AUDIT_ID = "pig.script.id";
+
+ // Kill the jobs before cleaning up tmp files
+ public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+ public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+ public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
}