PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1817995 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e3d869a..bdd211b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
BUG FIXES
+PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
+
PIG-5201: Null handling on FLATTEN (knoguchi)
PIG-5315: pig.script is not set for scripts run via PigServer (satishsaley via rohini)
diff --git a/test/org/apache/pig/test/TestAssert.java b/test/org/apache/pig/test/TestAssert.java
index 3eef71e..52c7970 100644
--- a/test/org/apache/pig/test/TestAssert.java
+++ b/test/org/apache/pig/test/TestAssert.java
@@ -25,6 +25,7 @@
import java.io.InputStream;
import java.util.List;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Assert;
import org.apache.pig.PigServer;
@@ -118,7 +119,7 @@
} catch (FrontendException fe) {
if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
|| pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
- Assert.assertTrue(fe.getCause().getMessage().contains(
+ Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
"Assertion violated: i should be greater than 1"));
} else {
Assert.assertTrue(fe.getCause().getMessage().contains(
@@ -150,7 +151,7 @@
} catch (FrontendException fe) {
if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
|| pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
- Assert.assertTrue(fe.getCause().getMessage().contains(
+ Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
"Assertion violated: i should be greater than 1"));
} else {
Assert.assertTrue(fe.getCause().getMessage().contains(
diff --git a/test/org/apache/pig/test/TestEvalPipeline.java b/test/org/apache/pig/test/TestEvalPipeline.java
index 6480fbe..d846f4d 100644
--- a/test/org/apache/pig/test/TestEvalPipeline.java
+++ b/test/org/apache/pig/test/TestEvalPipeline.java
@@ -24,7 +24,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
diff --git a/test/org/apache/pig/test/TestEvalPipeline2.java b/test/org/apache/pig/test/TestEvalPipeline2.java
index e378254..d9c8041 100644
--- a/test/org/apache/pig/test/TestEvalPipeline2.java
+++ b/test/org/apache/pig/test/TestEvalPipeline2.java
@@ -34,6 +34,7 @@
import java.util.Properties;
import java.util.Random;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -1459,7 +1460,8 @@
pigServer.openIterator("b");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+ Assert.assertTrue(ExceptionUtils.getRootCause(e).getMessage().contains(
+ "Unexpected data type " + ArrayList.class.getName() + " found in stream."));
}
}
finally {
diff --git a/test/org/apache/pig/test/TestGrunt.java b/test/org/apache/pig/test/TestGrunt.java
index ed84dba..1786693 100644
--- a/test/org/apache/pig/test/TestGrunt.java
+++ b/test/org/apache/pig/test/TestGrunt.java
@@ -915,16 +915,7 @@
}
@Test
- public void testKeepGoigFailed() throws Throwable {
- // in mr mode, the output file 'baz' will be automatically deleted if the mr job fails
- // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz does not exist"
- // in GruntParser#processCat() and variable "caught" is true
- // in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953)
- // when "cat baz;" is executed, it does not throw exception and the variable "caught" is false
- // TODO: Enable this for Spark when SPARK-7953 is resolved
- Assume.assumeTrue(
- "Skip this test for Spark until SPARK-7953 is resolved!",
- !Util.isSparkExecType(cluster.getExecType()));
+ public void testKeepGoingFailed() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd");
diff --git a/test/org/apache/pig/test/TestScalarAliases.java b/test/org/apache/pig/test/TestScalarAliases.java
index 72fc817..c8771ec 100644
--- a/test/org/apache/pig/test/TestScalarAliases.java
+++ b/test/org/apache/pig/test/TestScalarAliases.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.junit.AfterClass;
@@ -108,7 +109,7 @@
pigServer.openIterator("C");
fail("exception expected - scalar input has multiple rows");
} catch (IOException pe){
- Util.checkStrContainsSubStr(pe.getCause().getMessage(),
+ Util.checkStrContainsSubStr(ExceptionUtils.getRootCause(pe).getMessage(),
"Scalar has more than one row in the output"
);
}
diff --git a/test/org/apache/pig/test/TestStoreBase.java b/test/org/apache/pig/test/TestStoreBase.java
index 029269d..6e1ddbf 100644
--- a/test/org/apache/pig/test/TestStoreBase.java
+++ b/test/org/apache/pig/test/TestStoreBase.java
@@ -143,6 +143,8 @@
String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
+ boolean isSpark2_2_plus = Util.isSpark2_2_plus();
+
Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
if (mode.toString().startsWith("SPARK")) {
filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
@@ -174,13 +176,21 @@
filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
- // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ if (isSpark2_2_plus) {
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+ } else {
+ // OutputCommitter.abortTask will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ }
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
- // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+ if (isSpark2_2_plus) {
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+ } else {
+ // OutputCommitter.abortJob will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+ }
filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
@@ -218,8 +228,12 @@
if(mode.isLocal()) {
// MR LocalJobRunner does not call abortTask
- if (!mode.toString().startsWith("TEZ")) {
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ if (!Util.isTezExecType(mode)) {
+ if (Util.isSparkExecType(mode) && isSpark2_2_plus) {
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+ } else {
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ }
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
}
if (Util.isHadoop1_x()) {
diff --git a/test/org/apache/pig/test/Util.java b/test/org/apache/pig/test/Util.java
index d1c711b..788a72f 100644
--- a/test/org/apache/pig/test/Util.java
+++ b/test/org/apache/pig/test/Util.java
@@ -105,6 +105,7 @@
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.spark.package$;
import org.junit.Assert;
import com.google.common.base.Function;
@@ -400,7 +401,7 @@
}
FileStatus fileStatus = fs.getFileStatus(path);
FileStatus[] files;
- if (fileStatus.isDir()) {
+ if (fileStatus.isDirectory()) {
files = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
@@ -731,7 +732,7 @@
String line = null;
FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
- if(fst.isDir()) {
+ if(fst.isDirectory()) {
throw new IOException("Only files from cluster can be copied locally," +
" " + fileNameOnCluster + " is a directory");
}
@@ -1250,13 +1251,14 @@
LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString);
checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema);
}
- /**
+
+ /**
* Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected string results before comparison
*
* @param actualResultsIt Result of the executed Pig query
* @param expectedResArray Expected string results to validate against
- * @param fs fieldSchema of expecteResArray
+ * @param schema fieldSchema of expecteResArray
* @throws IOException
*/
static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
@@ -1334,6 +1336,11 @@
return false;
}
+ public static boolean isSpark2_2_plus() throws IOException {
+ String sparkVersion = package$.MODULE$.SPARK_VERSION();
+ return sparkVersion != null && sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*");
+ }
+
public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){
if( toSort == true) {
for (Tuple t : actualResList) {