PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1817995 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e3d869a..bdd211b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
  
 BUG FIXES
 
+PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
+
 PIG-5201: Null handling on FLATTEN (knoguchi)
 
 PIG-5315: pig.script is not set for scripts run via PigServer (satishsaley via rohini)
diff --git a/test/org/apache/pig/test/TestAssert.java b/test/org/apache/pig/test/TestAssert.java
index 3eef71e..52c7970 100644
--- a/test/org/apache/pig/test/TestAssert.java
+++ b/test/org/apache/pig/test/TestAssert.java
@@ -25,6 +25,7 @@
 import java.io.InputStream;
 import java.util.List;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.junit.Assert;
 
 import org.apache.pig.PigServer;
@@ -118,7 +119,7 @@
       } catch (FrontendException fe) {
           if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
                   || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
-              Assert.assertTrue(fe.getCause().getMessage().contains(
+              Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(
@@ -150,7 +151,7 @@
       } catch (FrontendException fe) {
           if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
                   || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
-              Assert.assertTrue(fe.getCause().getMessage().contains(
+              Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(
diff --git a/test/org/apache/pig/test/TestEvalPipeline.java b/test/org/apache/pig/test/TestEvalPipeline.java
index 6480fbe..d846f4d 100644
--- a/test/org/apache/pig/test/TestEvalPipeline.java
+++ b/test/org/apache/pig/test/TestEvalPipeline.java
@@ -24,7 +24,6 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
diff --git a/test/org/apache/pig/test/TestEvalPipeline2.java b/test/org/apache/pig/test/TestEvalPipeline2.java
index e378254..d9c8041 100644
--- a/test/org/apache/pig/test/TestEvalPipeline2.java
+++ b/test/org/apache/pig/test/TestEvalPipeline2.java
@@ -34,6 +34,7 @@
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -1459,7 +1460,8 @@
                 pigServer.openIterator("b");
                 Assert.fail();
             } catch (Exception e) {
-                Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+                Assert.assertTrue(ExceptionUtils.getRootCause(e).getMessage().contains(
+                        "Unexpected data type " + ArrayList.class.getName() + " found in stream."));
             }
         }
         finally {
diff --git a/test/org/apache/pig/test/TestGrunt.java b/test/org/apache/pig/test/TestGrunt.java
index ed84dba..1786693 100644
--- a/test/org/apache/pig/test/TestGrunt.java
+++ b/test/org/apache/pig/test/TestGrunt.java
@@ -915,16 +915,7 @@
     }
 
     @Test
-    public void testKeepGoigFailed() throws Throwable {
-        // in mr mode, the output file 'baz' will be automatically deleted if the mr job fails
-        // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz does not exist"
-        // in GruntParser#processCat() and variable "caught" is true
-        // in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953)
-        // when "cat baz;" is executed, it does not throw exception and the variable "caught" is false
-        // TODO: Enable this for Spark when SPARK-7953 is resolved
-        Assume.assumeTrue(
-            "Skip this test for Spark until SPARK-7953 is resolved!",
-            !Util.isSparkExecType(cluster.getExecType()));
+    public void testKeepGoingFailed() throws Throwable {
         PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext context = server.getPigContext();
         Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd");
diff --git a/test/org/apache/pig/test/TestScalarAliases.java b/test/org/apache/pig/test/TestScalarAliases.java
index 72fc817..c8771ec 100644
--- a/test/org/apache/pig/test/TestScalarAliases.java
+++ b/test/org/apache/pig/test/TestScalarAliases.java
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
@@ -108,7 +109,7 @@
             pigServer.openIterator("C");
             fail("exception expected - scalar input has multiple rows");
         } catch (IOException pe){
-            Util.checkStrContainsSubStr(pe.getCause().getMessage(),
+            Util.checkStrContainsSubStr(ExceptionUtils.getRootCause(pe).getMessage(),
                     "Scalar has more than one row in the output"
             );
         }
diff --git a/test/org/apache/pig/test/TestStoreBase.java b/test/org/apache/pig/test/TestStoreBase.java
index 029269d..6e1ddbf 100644
--- a/test/org/apache/pig/test/TestStoreBase.java
+++ b/test/org/apache/pig/test/TestStoreBase.java
@@ -143,6 +143,8 @@
         String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
         String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
 
+        boolean isSpark2_2_plus = Util.isSpark2_2_plus();
+        
         Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
         if (mode.toString().startsWith("SPARK")) {
             filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
@@ -174,13 +176,21 @@
             filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
-            // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953
-            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+            if (isSpark2_2_plus) {
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+            } else {
+                // OutputCommitter.abortTask will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+            }
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
-            // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953
-            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+            if (isSpark2_2_plus) {
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+            } else {
+                // OutputCommitter.abortJob will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+            }
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
@@ -218,8 +228,12 @@
 
         if(mode.isLocal()) {
             // MR LocalJobRunner does not call abortTask
-            if (!mode.toString().startsWith("TEZ")) {
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+            if (!Util.isTezExecType(mode)) {
+                if (Util.isSparkExecType(mode) && isSpark2_2_plus) {
+                    filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+                } else {
+                    filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+                }
                 filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
             }
             if (Util.isHadoop1_x()) {
diff --git a/test/org/apache/pig/test/Util.java b/test/org/apache/pig/test/Util.java
index d1c711b..788a72f 100644
--- a/test/org/apache/pig/test/Util.java
+++ b/test/org/apache/pig/test/Util.java
@@ -105,6 +105,7 @@
 import org.apache.pig.parser.QueryParserDriver;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.spark.package$;
 import org.junit.Assert;
 
 import com.google.common.base.Function;
@@ -400,7 +401,7 @@
         }
         FileStatus fileStatus = fs.getFileStatus(path);
         FileStatus[] files;
-        if (fileStatus.isDir()) {
+        if (fileStatus.isDirectory()) {
             files = fs.listStatus(path, new PathFilter() {
                 @Override
                 public boolean accept(Path p) {
@@ -731,7 +732,7 @@
 
         String line = null;
  	   FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
- 	   if(fst.isDir()) {
+ 	   if(fst.isDirectory()) {
  	       throw new IOException("Only files from cluster can be copied locally," +
  	       		" " + fileNameOnCluster + " is a directory");
  	   }
@@ -1250,13 +1251,14 @@
         LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString);
         checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema);
     }
-          /**
+    
+    /**
      * Helper function to check if the result of a Pig Query is in line with
      * expected results. It sorts actual and expected string results before comparison
      *
      * @param actualResultsIt Result of the executed Pig query
      * @param expectedResArray Expected string results to validate against
-     * @param fs fieldSchema of expecteResArray
+     * @param schema fieldSchema of expecteResArray
      * @throws IOException
      */
     static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
@@ -1334,6 +1336,11 @@
         return false;
     }
 
+    public static boolean isSpark2_2_plus() throws IOException {
+        String sparkVersion = package$.MODULE$.SPARK_VERSION();
+        return sparkVersion != null && sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*");
+    }
+
     public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){
         if( toSort == true) {
             for (Tuple t : actualResList) {