PIG-4911: Provide option to disable DAG recovery (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1749958 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bdee4f..4c30074 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@
  
 IMPROVEMENTS
 
+PIG-4911: Provide option to disable DAG recovery (rohini)
+
 PIG-4906: Add Bigdecimal functions in Over function (cgalan via daijy)
 
 PIG-2768: Fix org.apache.hadoop.conf.Configuration deprecation warnings for Hadoop 23 (rohini)
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index d80914f..a3a24d9 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -158,6 +158,12 @@
      * This key is used to configure grace parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism";
+    /**
+     * This key is used to turn off dag recovery if there is auto parallelism.
+     * Default is false. Useful when running with Tez versions before Tez 0.8
+     * which have issues with auto parallelism during DAG recovery.
+     */
+    public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = "pig.tez.auto.parallelism.disable.dag.recovery";
 
     /**
      * This key is used to configure compression for the pig input splits which
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 1d39ea1..fb401b8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -191,6 +191,8 @@
     private String mapTaskLaunchCmdOpts;
     private String reduceTaskLaunchCmdOpts;
 
+    private boolean disableDAGRecovery = false;
+
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -210,6 +212,10 @@
         }
     }
 
+    public boolean shouldDisableDAGRecovery() {
+        return disableDAGRecovery;
+    }
+
     private void initialize(PigContext pc) throws IOException {
 
         this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -781,6 +787,7 @@
 
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
+            boolean autoParallelism = false;
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                 if (tezOp.getVertexParallelism()==-1 && (
                         tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -789,6 +796,7 @@
                     // to decrease/increase parallelism of sorting vertex dynamically
                     // based on the numQuantiles calculated by sample aggregation vertex
                     vmPluginName = PartitionerDefinedVertexManager.class.getName();
+                    autoParallelism = true;
                     log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
                 }
             } else {
@@ -836,9 +844,13 @@
                         }
                     }
                     vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
+                    autoParallelism = true;
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }
+            if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
+                disableDAGRecovery = true;
+            }
         }
         if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
                 vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
index 78f6f8c..80ef5ad 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
@@ -56,7 +56,7 @@
  */
 public class TezJob implements Runnable {
     private static final Log log = LogFactory.getLog(TezJob.class);
-    private Configuration conf;
+    private TezConfiguration conf;
     private EnumSet<StatusGetOpts> statusGetOpts;
     private Map<String, LocalResource> requestAMResources;
     private ApplicationId appId;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
index 4502e3c..ddecddc 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
@@ -52,11 +52,12 @@
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
 
     private PigContext pigContext;
-    private TezConfiguration tezConf;
+    private Configuration conf;
+    private boolean disableDAGRecovery;
 
     public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
-        this.tezConf = new TezConfiguration(conf);
+        this.conf = conf;
     }
 
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -66,6 +67,7 @@
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         dagBuilder.avoidContainerReuseIfInputSplitInDisk();
+        disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
         return tezDag;
     }
 
@@ -87,6 +89,7 @@
         return job;
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
@@ -130,6 +133,12 @@
                 dagSetCallerContext.invoke(tezDag, context);
             }
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
+            TezConfiguration tezConf = new TezConfiguration(conf);
+            if (disableDAGRecovery
+                    && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+                            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+                tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+            }
             return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {
             int errCode = 2017;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
index 52b28de..9fd377f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
@@ -64,11 +64,17 @@
     private TezSessionManager() {
     }
 
-    public static class SessionInfo {
-        SessionInfo(TezClient session, Map<String, LocalResource> resources) {
+    private static class SessionInfo {
+
+        public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) {
             this.session = session;
+            this.config = config;
             this.resources = resources;
         }
+
+        public TezConfiguration getConfig() {
+            return config;
+        }
         public Map<String, LocalResource> getResources() {
             return resources;
         }
@@ -80,20 +86,21 @@
         }
         private TezClient session;
         private Map<String, LocalResource> resources;
+        private TezConfiguration config;
         private boolean inUse = false;
     }
 
     private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
 
-    private static SessionInfo createSession(Configuration conf,
+    private static SessionInfo createSession(TezConfiguration amConf,
             Map<String, LocalResource> requestedAMResources, Credentials creds,
             TezJobConfig tezJobConf) throws TezException, IOException,
             InterruptedException {
-        TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        MRToTezHelper.translateMRSettingsForTezAM(amConf);
         TezScriptState ss = TezScriptState.get();
         ss.addDAGSettingsToConf(amConf);
         adjustAMConfig(amConf, tezJobConf);
-        String jobName = conf.get(PigContext.JOB_NAME, "pig");
+        String jobName = amConf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         try {
             tezClient.start();
@@ -107,7 +114,7 @@
             tezClient.stop();
             throw new RuntimeException(e);
         }
-        return new SessionInfo(tezClient, requestedAMResources);
+        return new SessionInfo(tezClient, amConf, requestedAMResources);
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
@@ -198,7 +205,22 @@
         return true;
     }
 
-    static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
+    private static boolean validateSessionConfig(SessionInfo currentSession,
+            Configuration newSessionConfig)
+            throws TezException, IOException {
+        // If DAG recovery is disabled for one and enabled for another, do not reuse
+        if (currentSession.getConfig().getBoolean(
+                    TezConfiguration.DAG_RECOVERY_ENABLED,
+                    TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
+                != newSessionConfig.getBoolean(
+                        TezConfiguration.DAG_RECOVERY_ENABLED,
+                        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+            return false;
+        }
+        return true;
+    }
+
+    static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources,
             Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
@@ -216,7 +238,8 @@
                         sessionsToRemove.add(sessionInfo);
                     } else if (!sessionInfo.inUse
                             && appMasterStatus.equals(TezAppMasterStatus.READY)
-                            && validateSessionResources(sessionInfo,requestedAMResources)) {
+                            && validateSessionResources(sessionInfo,requestedAMResources)
+                            && validateSessionConfig(sessionInfo, conf)) {
                         sessionInfo.inUse = true;
                         return sessionInfo.session;
                     }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
index f72931c..7e06d69 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
@@ -177,20 +177,15 @@
         }
     }
 
-    public static TezConfiguration getDAGAMConfFromMRConf(
-            Configuration tezConf) {
-
-        // Set Tez parameters based on MR parameters.
-        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-
+    public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
 
         convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
         convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
-        String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
-        if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
-            env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
-                                : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV);
+        String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+        if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
+            env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
+                                : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
         }
 
         if (env != null) {
@@ -199,24 +194,23 @@
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 org.apache.tez.mapreduce.hadoop.MRHelpers
-                        .getJavaOptsForMRAM(tezConf));
+                        .getJavaOptsForMRAM(dagAMConf));
 
-        String queueName = tezConf.get(JobContext.QUEUE_NAME,
+        String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
         dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
 
         removeUnwantedSettings(dagAMConf, true);
 
-        return dagAMConf;
     }
 
     /**
diff --git a/test/org/apache/pig/tez/TestTezJobExecution.java b/test/org/apache/pig/tez/TestTezJobExecution.java
index c98ec81..97ede40 100644
--- a/test/org/apache/pig/tez/TestTezJobExecution.java
+++ b/test/org/apache/pig/tez/TestTezJobExecution.java
@@ -18,15 +18,25 @@
 package org.apache.pig.tez;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigRunner;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -39,12 +49,16 @@
 
     private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class);
 
+    private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + "input";
     private PigServer pigServer;
 
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         Util.deleteDirectory(new File(TEST_DIR));
         new File(TEST_DIR).mkdirs();
+        Util.createLocalInputFile(INPUT_FILE, new String[] {
+            "1", "1", "1", "2", "2", "2"
+        });
     }
 
     @AfterClass
@@ -54,18 +68,14 @@
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(Util.getLocalTestMode());
+        pigServer = new PigServer("tez_local");
     }
 
     @Test
     public void testUnionParallelRoundRobinBatchSize() throws IOException {
-        String input = TEST_DIR + Path.SEPARATOR + "input1";
         String output = TEST_DIR + Path.SEPARATOR + "output1";
-        Util.createInputFile(pigServer.getPigContext(), input, new String[] {
-            "1", "1", "1", "2", "2", "2"
-        });
-        String query = "A = LOAD '" + input + "';"
-                + "B = LOAD '" + input + "';"
+        String query = "A = LOAD '" + INPUT_FILE + "';"
+                + "B = LOAD '" + INPUT_FILE + "';"
                 + "C = UNION A, B PARALLEL 2;"
                 + "STORE C into '" + output + "';";
         pigServer.getPigContext().getProperties().setProperty(
@@ -77,4 +87,100 @@
         assertEquals("2\n2\n2\n2\n2\n2\n", part1);
     }
 
+    @Test
+    public void testDAGDiscoveryDisabled() throws IOException {
+        String output1 = TEST_DIR + Path.SEPARATOR + "output-parallel";
+        String output2 = TEST_DIR + Path.SEPARATOR + "output-autoparallel";
+        String scriptFile = TEST_DIR + Path.SEPARATOR + "testDAGRecoveryDisable.pig";
+        String query = "A = LOAD '" + INPUT_FILE + "';"
+                + "B = GROUP A BY $0 PARALLEL 1;"
+                + "STORE B into '" + output1 + "';"
+                + "exec;"
+                + "C = LOAD '" + INPUT_FILE + "';"
+                + "D = GROUP C BY $0;"
+                + "STORE D into '" + output2 + "';";
+
+        Util.createLocalInputFile(scriptFile, new String[] {query});
+
+        String[] args = { "-x", "tez_local", scriptFile };
+
+        TestNotificationListener listener = new TestNotificationListener();
+        // Recovery is not disabled when there is auto parallelism. Should reuse AM application session
+        PigStats stats = PigRunner.run(args, listener);
+        assertTrue(stats.isSuccessful());
+        assertEquals(listener.getJobsStarted().size(), 1);
+
+        Util.deleteFile(pigServer.getPigContext(), output1);
+        Util.deleteFile(pigServer.getPigContext(), output2);
+
+        // Recovery is disabled when there is auto parallelism. Should use two different AMs
+        listener.reset();
+        args = new String[] {
+                "-D" + PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY + "=true",
+                "-x",
+                "tez_local",
+                scriptFile };
+        stats = PigRunner.run(args, listener);
+        assertTrue(stats.isSuccessful());
+        assertEquals(listener.getJobsStarted().size(), 2);
+    }
+
+
+    private static class TestNotificationListener implements PigProgressNotificationListener {
+
+        private Set<String> jobsStarted = new HashSet<String>();
+
+        public void reset() {
+            this.jobsStarted.clear();
+        }
+
+        public Set<String> getJobsStarted() {
+            return jobsStarted;
+        }
+
+        @Override
+        public void initialPlanNotification(String scriptId,
+                OperatorPlan<?> plan) {
+        }
+
+        @Override
+        public void launchStartedNotification(String scriptId,
+                int numJobsToLaunch) {
+        }
+
+        @Override
+        public void jobsSubmittedNotification(String scriptId,
+                int numJobsSubmitted) {
+        }
+
+        @Override
+        public void jobStartedNotification(String scriptId, String assignedJobId) {
+            jobsStarted.add(assignedJobId);
+        }
+
+        @Override
+        public void jobFinishedNotification(String scriptId, JobStats jobStats) {
+        }
+
+        @Override
+        public void jobFailedNotification(String scriptId, JobStats jobStats) {
+        }
+
+        @Override
+        public void outputCompletedNotification(String scriptId,
+                OutputStats outputStats) {
+        }
+
+        @Override
+        public void progressUpdatedNotification(String scriptId, int progress) {
+
+        }
+
+        @Override
+        public void launchCompletedNotification(String scriptId,
+                int numJobsSucceeded) {
+        }
+
+    }
+
 }
diff --git a/test/org/apache/pig/tez/TestTezLauncher.java b/test/org/apache/pig/tez/TestTezLauncher.java
index d5aecab..9ffd8f8 100644
--- a/test/org/apache/pig/tez/TestTezLauncher.java
+++ b/test/org/apache/pig/tez/TestTezLauncher.java
@@ -23,7 +23,6 @@
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
@@ -35,6 +34,7 @@
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -124,11 +124,11 @@
 
     @Test
     public void testQueueName() throws Exception {
-        Configuration conf = new Configuration();
+        TezConfiguration conf = new TezConfiguration();
         conf.set("tez.queue.name", "special");
-        conf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        MRToTezHelper.translateMRSettingsForTezAM(conf);
         assertEquals(conf.get("tez.queue.name"), "special");
-        
+
     }
 }