PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1816557 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8c037f3..250aae1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
BUG FIXES
+PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via rohini)
+
PIG-5307: NPE in TezOperDependencyParallelismEstimator (rohini)
PIG-5272: BagToTuple output schema is incorrect (juen1jp via rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
index fc26fac..7cb59e7 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
@@ -115,6 +115,7 @@
public static String sampleVertex;
public static Map<String, Object> sampleMap;
+ private volatile boolean isAborted = false;
public PigProcessor(ProcessorContext context) {
super(context);
@@ -305,9 +306,12 @@
}
if (!fileOutputs.isEmpty()) {
- while (!getContext().canCommit()) {
+ while (!getContext().canCommit() && !isAborted) {
Thread.sleep(100);
}
+ if (isAborted) {
+ return;
+ }
for (MROutput fileOutput : fileOutputs){
fileOutput.flush();
if (fileOutput.isCommitRequired()) {
@@ -464,4 +468,11 @@
}
}
+ // TODO add @Override when we upgrade to Tez 0.9 dependency
+ public void abort() {
+ isAborted = true;
+ LOG.warn("Aborting execution");
+ abortOutput();
+ }
+
}