PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1811322 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 09c360c..5f399aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)
+
 PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini)
 
 PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via rohini)
diff --git a/build.xml b/build.xml
index 2aedce8..531538a 100644
--- a/build.xml
+++ b/build.xml
@@ -118,6 +118,7 @@
     <property name="smoke.tests.jarfile" value="${build.dir}/${final.name}-smoketests.jar" />
     <property name="test.pigunit.src.dir" value="${test.src.dir}/org/apache/pig/test/pigunit" />
     <property name="test.pigunit.file" value="${test.src.dir}/pigunit-tests"/>
+    <property name="pigtest.jarfile" value="pigtest.jar" />
 
 
     <!-- test configuration, use ${user.home}/build.properties to configure values  -->
@@ -160,7 +161,6 @@
         <propertyreset name="isHadoop2" value="true" />
         <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
         <propertyreset name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
-        <propertyreset name="src.exclude.dir" value="" />
         <propertyreset name="test.exec.type" value="tez" />
     </target>
 
@@ -278,6 +278,7 @@
               value="${mvnrepo}/org/codehaus/jackson/jackson-core-asl/${jackson-pig-3039-test.version}/jackson-core-asl-${jackson-pig-3039-test.version}.jar"/>
     <property name="jackson_mapper_repo_url"
               value="${mvnrepo}/org/codehaus/jackson/jackson-mapper-asl/${jackson-pig-3039-test.version}/jackson-mapper-asl-${jackson-pig-3039-test.version}.jar"/>
+    <property name="test.spark.spark_master" value="yarn-client" />
 
     <!--this is the naming policy for artifacts we want pulled down-->
     <property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[artifact]-[revision](-[classifier]).[ext]"/>
@@ -893,32 +894,32 @@
     <!-- ================================================================== -->
     <!-- Run unit tests                                                     -->
     <!-- ================================================================== -->
-    <target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests">
+    <target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests">
         <macro-test-runner test.file="${test.all.file}" tests.failed="test-core.failed" />
         <fail if="test-core.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting">
+    <target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting">
         <macro-test-runner test.file="${test.commit.file}" tests.failed="test-commit.failed"/>
         <fail if="test-commit.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run all true unit tests">
+    <target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run all true unit tests">
         <macro-test-runner test.file="${test.unit.file}" tests.failed="test-unit.failed"/>
         <fail if="test-unit.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run 30 min smoke tests">
+    <target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run 30 min smoke tests">
         <macro-test-runner test.file="${test.smoke.file}" tests.failed="test-smoke.failed"/>
         <fail if="test-smoke.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests">
+    <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests">
         <macro-test-runner test.file="${test.all.file}" tests.failed="test-tez.failed"/>
         <fail if="test-tez.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-spark" depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run Spark unit tests in Spark cluster-local mode">
+    <target name="test-spark" depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download" description="Run Spark unit tests in Spark cluster-local mode">
         <macro-test-runner test.file="${test.all.file}" tests.failed="test-spark.failed"/>
         <fail if="test-spark.failed">Tests failed!</fail>
     </target>
@@ -957,6 +958,7 @@
             <sysproperty key="java.security.krb5.kdc" value="" />
             <sysproperty key="log4j.configuration" value="file:${basedir}/conf/test-log4j.properties"/>
             <env key="MALLOC_ARENA_MAX" value="4"/>
+            <env key="SPARK_MASTER" value="${test.spark.spark_master}"/>
             <env key="PATH" path="${build.path}"/>
             <classpath>
                 <pathelement location="${output.jarfile.core}" />
@@ -999,7 +1001,7 @@
     </target>
 
     <target name="test-core-mrtez" description="run core tests on both mr and tez mode"
-            depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download">
+            depends="setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download">
         <fail message="hadoopversion must be set to 2 when invoking test-core-mrtez">
           <condition>
             <not>
@@ -1072,6 +1074,16 @@
         <ant dir="${pigmix.dir}" target="test"/>
     </target>
 
+    <target name="pigtest-jar" depends="compile-test, ivy-test" description="create the pigtest jar file">
+        <echo> *** Creating pigtest.jar ***</echo>
+        <jar destfile="${pigtest.jarfile}">
+            <fileset dir="${test.build.classes}">
+                <include name="**/org/apache/pig/test/**"/>
+            </fileset>
+            <zipfileset src="${ivy.lib.dir}/commons-lang-${commons-lang.version}.jar" />
+        </jar>
+    </target>
+
     <!-- ================================================================== -->
     <!-- Pigunit                                                            -->
     <!-- ================================================================== -->
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
index 7bf059b..3143987 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
@@ -431,6 +431,7 @@
         Set<String> allJars = new HashSet<String>();
         LOG.info("Add default jars to Spark Job");
         allJars.addAll(JarManager.getDefaultJars());
+        JarManager.addPigTestJarIfPresent(allJars);
         LOG.info("Add script jars to Spark Job");
         for (String scriptJar : pigContext.scriptJars) {
             allJars.add(scriptJar);
@@ -536,23 +537,35 @@
         return sparkPlan;
     }
 
+
+    private static String getMaster(PigContext pc){
+        String master = null;
+        if (pc.getExecType().isLocal()) {
+            master = "local";
+        } else {
+            master = System.getenv("SPARK_MASTER");
+            if (master == null) {
+                LOG.info("SPARK_MASTER not specified, using \"local\"");
+                master = "local";
+            }
+        }
+        return master;
+    }
+
     /**
      * Only one SparkContext may be active per JVM (SPARK-2243). When multiple threads start SparkLaucher,
-     * the static member sparkContext should be initialized only once
+     * the static member sparkContext should be initialized only by either local or cluster mode at a time.
+     *
+     * In case it was already initialized with a different mode than what the new pigContext instance wants, it will
+     * close down the existing SparkContext and re-initalize it with the new mode.
      */
     private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext pc) throws PigException {
+        String master = getMaster(pc);
+        if (sparkContext != null && !master.equals(sparkContext.master())){
+            sparkContext.close();
+            sparkContext = null;
+        }
         if (sparkContext == null) {
-            String master = null;
-            if (pc.getExecType().isLocal()) {
-                master = "local";
-            } else {
-                master = System.getenv("SPARK_MASTER");
-                if (master == null) {
-                    LOG.info("SPARK_MASTER not specified, using \"local\"");
-                    master = "local";
-                }
-            }
-
             String sparkHome = System.getenv("SPARK_HOME");
             if (!master.startsWith("local") && !master.equals("yarn-client")) {
                 // Check that we have the Mesos native library and Spark home
@@ -590,8 +603,10 @@
                 }
             }
 
-            //see PIG-5200 why need to set spark.executor.userClassPathFirst as true
-            sparkConf.set("spark.executor.userClassPathFirst", "true");
+            //see PIG-5200 why need to set spark.executor.userClassPathFirst as true on cluster modes
+            if (! "local".equals(master)) {
+                sparkConf.set("spark.executor.userClassPathFirst", "true");
+            }
             checkAndConfigureDynamicAllocation(master, sparkConf);
 
             sparkContext = new JavaSparkContext(sparkConf);
diff --git a/src/org/apache/pig/impl/util/JarManager.java b/src/org/apache/pig/impl/util/JarManager.java
index dafc42b..e6c9215 100644
--- a/src/org/apache/pig/impl/util/JarManager.java
+++ b/src/org/apache/pig/impl/util/JarManager.java
@@ -58,6 +58,7 @@
 public class JarManager {
 
     private static Log log = LogFactory.getLog(JarManager.class);
+    private static final String PIGTEST_JAR = "pigtest.jar";
 
     private static enum DefaultPigPackages {
 
@@ -306,4 +307,15 @@
         }
     }
 
+    /**
+     * Adds jar file where pig test classes are packed (build/test/classes)
+     * @param jars Set of jars to append pig tests jar into
+     */
+    public static void addPigTestJarIfPresent(Set<String> jars) {
+        File file = new File(PIGTEST_JAR);
+        if (file.exists()) {
+            jars.add(file.getAbsolutePath());
+        }
+    }
+
 }
diff --git a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
index a6a3834..f0dc151 100644
--- a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
+++ b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
@@ -119,7 +119,7 @@
     }
 
     private static void createInputData() throws Exception {
-        pigServer = new PigServer(Util.getLocalTestMode());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
 
         new File(inpbasedir).mkdirs();
         new File(outbasedir).mkdirs();
@@ -162,10 +162,11 @@
         }
         bw.close();
 
+        Util.copyFromLocalToCluster(cluster, inputTxtFile, inputTxtFile);
+
         // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
         pigServer.registerQuery("A = load '" + Util.generateURI(inputTxtFile, pigServer.getPigContext()) + "' as (f1:boolean, f2:int, f3:int, f4:int, f5:long, f6:float, f7:double, f8:bytearray, f9:chararray, f10:datetime, f11:bigdecimal);");
-        pigServer.registerQuery("store A into '" + Util.generateURI(INPUT, pigServer.getPigContext()) +"' using OrcStorage('-r 1000 -s 100000');");
-        Util.copyFromLocalToCluster(cluster, INPUT, INPUT);
+        pigServer.registerQuery("store A into '" + INPUT +"' using OrcStorage('-r 1000 -s 100000');");
     }
 
     @AfterClass