PIG-4911: Provide option to disable DAG recovery (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1749958 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bdee4f..4c30074 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-4911: Provide option to disable DAG recovery (rohini)
+
PIG-4906: Add Bigdecimal functions in Over function (cgalan via daijy)
PIG-2768: Fix org.apache.hadoop.conf.Configuration deprecation warnings for Hadoop 23 (rohini)
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index d80914f..a3a24d9 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -158,6 +158,12 @@
* This key is used to configure grace parallelism in tez. Default is true.
*/
public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism";
+ /**
+ * This key is used to turn off dag recovery if there is auto parallelism.
+ * Default is false. Useful when running with Tez versions before Tez 0.8
+ * which have issues with auto parallelism during DAG recovery.
+ */
+ public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = "pig.tez.auto.parallelism.disable.dag.recovery";
/**
* This key is used to configure compression for the pig input splits which
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 1d39ea1..fb401b8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -191,6 +191,8 @@
private String mapTaskLaunchCmdOpts;
private String reduceTaskLaunchCmdOpts;
+ private boolean disableDAGRecovery = false;
+
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -210,6 +212,10 @@
}
}
+ public boolean shouldDisableDAGRecovery() {
+ return disableDAGRecovery;
+ }
+
private void initialize(PigContext pc) throws IOException {
this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -781,6 +787,7 @@
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
+ boolean autoParallelism = false;
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
if (tezOp.getVertexParallelism()==-1 && (
tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -789,6 +796,7 @@
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
vmPluginName = PartitionerDefinedVertexManager.class.getName();
+ autoParallelism = true;
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
}
} else {
@@ -836,9 +844,13 @@
}
}
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
+ autoParallelism = true;
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}
+ if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
+ disableDAGRecovery = true;
+ }
}
if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
index 78f6f8c..80ef5ad 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
@@ -56,7 +56,7 @@
*/
public class TezJob implements Runnable {
private static final Log log = LogFactory.getLog(TezJob.class);
- private Configuration conf;
+ private TezConfiguration conf;
private EnumSet<StatusGetOpts> statusGetOpts;
private Map<String, LocalResource> requestAMResources;
private ApplicationId appId;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
index 4502e3c..ddecddc 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
@@ -52,11 +52,12 @@
private static final Log log = LogFactory.getLog(TezJobCompiler.class);
private PigContext pigContext;
- private TezConfiguration tezConf;
+ private Configuration conf;
+ private boolean disableDAGRecovery;
public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
- this.tezConf = new TezConfiguration(conf);
+ this.conf = conf;
}
public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -66,6 +67,7 @@
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
dagBuilder.avoidContainerReuseIfInputSplitInDisk();
+ disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
return tezDag;
}
@@ -87,6 +89,7 @@
return job;
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
try {
@@ -130,6 +133,12 @@
dagSetCallerContext.invoke(tezDag, context);
}
log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ if (disableDAGRecovery
+ && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+ TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+ tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+ }
return new TezJob(tezConf, tezDag, localResources, tezPlan);
} catch (Exception e) {
int errCode = 2017;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
index 52b28de..9fd377f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
@@ -64,11 +64,17 @@
private TezSessionManager() {
}
- public static class SessionInfo {
- SessionInfo(TezClient session, Map<String, LocalResource> resources) {
+ private static class SessionInfo {
+
+ public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) {
this.session = session;
+ this.config = config;
this.resources = resources;
}
+
+ public TezConfiguration getConfig() {
+ return config;
+ }
public Map<String, LocalResource> getResources() {
return resources;
}
@@ -80,20 +86,21 @@
}
private TezClient session;
private Map<String, LocalResource> resources;
+ private TezConfiguration config;
private boolean inUse = false;
}
private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
- private static SessionInfo createSession(Configuration conf,
+ private static SessionInfo createSession(TezConfiguration amConf,
Map<String, LocalResource> requestedAMResources, Credentials creds,
TezJobConfig tezJobConf) throws TezException, IOException,
InterruptedException {
- TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ MRToTezHelper.translateMRSettingsForTezAM(amConf);
TezScriptState ss = TezScriptState.get();
ss.addDAGSettingsToConf(amConf);
adjustAMConfig(amConf, tezJobConf);
- String jobName = conf.get(PigContext.JOB_NAME, "pig");
+ String jobName = amConf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
try {
tezClient.start();
@@ -107,7 +114,7 @@
tezClient.stop();
throw new RuntimeException(e);
}
- return new SessionInfo(tezClient, requestedAMResources);
+ return new SessionInfo(tezClient, amConf, requestedAMResources);
}
private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
@@ -198,7 +205,22 @@
return true;
}
- static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
+ private static boolean validateSessionConfig(SessionInfo currentSession,
+ Configuration newSessionConfig)
+ throws TezException, IOException {
+ // If DAG recovery is disabled for one and enabled for another, do not reuse
+ if (currentSession.getConfig().getBoolean(
+ TezConfiguration.DAG_RECOVERY_ENABLED,
+ TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
+ != newSessionConfig.getBoolean(
+ TezConfiguration.DAG_RECOVERY_ENABLED,
+ TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+ return false;
+ }
+ return true;
+ }
+
+ static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources,
Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
SessionInfo newSession = null;
@@ -216,7 +238,8 @@
sessionsToRemove.add(sessionInfo);
} else if (!sessionInfo.inUse
&& appMasterStatus.equals(TezAppMasterStatus.READY)
- && validateSessionResources(sessionInfo,requestedAMResources)) {
+ && validateSessionResources(sessionInfo,requestedAMResources)
+ && validateSessionConfig(sessionInfo, conf)) {
sessionInfo.inUse = true;
return sessionInfo.session;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
index f72931c..7e06d69 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
@@ -177,20 +177,15 @@
}
}
- public static TezConfiguration getDAGAMConfFromMRConf(
- Configuration tezConf) {
-
- // Set Tez parameters based on MR parameters.
- TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-
+ public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
- String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
- if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
- env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
- : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV);
+ String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
+ env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
+ : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
}
if (env != null) {
@@ -199,24 +194,23 @@
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
org.apache.tez.mapreduce.hadoop.MRHelpers
- .getJavaOptsForMRAM(tezConf));
+ .getJavaOptsForMRAM(dagAMConf));
- String queueName = tezConf.get(JobContext.QUEUE_NAME,
+ String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME);
dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
- tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+ dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
- tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+ dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
// Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
removeUnwantedSettings(dagAMConf, true);
- return dagAMConf;
}
/**
diff --git a/test/org/apache/pig/tez/TestTezJobExecution.java b/test/org/apache/pig/tez/TestTezJobExecution.java
index c98ec81..97ede40 100644
--- a/test/org/apache/pig/tez/TestTezJobExecution.java
+++ b/test/org/apache/pig/tez/TestTezJobExecution.java
@@ -18,15 +18,25 @@
package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.apache.pig.tools.pigstats.PigStats;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -39,12 +49,16 @@
private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class);
+ private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + "input";
private PigServer pigServer;
@BeforeClass
public static void oneTimeSetUp() throws Exception {
Util.deleteDirectory(new File(TEST_DIR));
new File(TEST_DIR).mkdirs();
+ Util.createLocalInputFile(INPUT_FILE, new String[] {
+ "1", "1", "1", "2", "2", "2"
+ });
}
@AfterClass
@@ -54,18 +68,14 @@
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(Util.getLocalTestMode());
+ pigServer = new PigServer("tez_local");
}
@Test
public void testUnionParallelRoundRobinBatchSize() throws IOException {
- String input = TEST_DIR + Path.SEPARATOR + "input1";
String output = TEST_DIR + Path.SEPARATOR + "output1";
- Util.createInputFile(pigServer.getPigContext(), input, new String[] {
- "1", "1", "1", "2", "2", "2"
- });
- String query = "A = LOAD '" + input + "';"
- + "B = LOAD '" + input + "';"
+ String query = "A = LOAD '" + INPUT_FILE + "';"
+ + "B = LOAD '" + INPUT_FILE + "';"
+ "C = UNION A, B PARALLEL 2;"
+ "STORE C into '" + output + "';";
pigServer.getPigContext().getProperties().setProperty(
@@ -77,4 +87,100 @@
assertEquals("2\n2\n2\n2\n2\n2\n", part1);
}
+ @Test
+ public void testDAGDiscoveryDisabled() throws IOException {
+ String output1 = TEST_DIR + Path.SEPARATOR + "output-parallel";
+ String output2 = TEST_DIR + Path.SEPARATOR + "output-autoparallel";
+ String scriptFile = TEST_DIR + Path.SEPARATOR + "testDAGRecoveryDisable.pig";
+ String query = "A = LOAD '" + INPUT_FILE + "';"
+ + "B = GROUP A BY $0 PARALLEL 1;"
+ + "STORE B into '" + output1 + "';"
+ + "exec;"
+ + "C = LOAD '" + INPUT_FILE + "';"
+ + "D = GROUP C BY $0;"
+ + "STORE D into '" + output2 + "';";
+
+ Util.createLocalInputFile(scriptFile, new String[] {query});
+
+ String[] args = { "-x", "tez_local", scriptFile };
+
+ TestNotificationListener listener = new TestNotificationListener();
+ // Recovery is not disabled when there is auto parallelism. Should reuse AM application session
+ PigStats stats = PigRunner.run(args, listener);
+ assertTrue(stats.isSuccessful());
+ assertEquals(listener.getJobsStarted().size(), 1);
+
+ Util.deleteFile(pigServer.getPigContext(), output1);
+ Util.deleteFile(pigServer.getPigContext(), output2);
+
+ // Recovery is disabled when there is auto parallelism. Should use two different AMs
+ listener.reset();
+ args = new String[] {
+ "-D" + PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY + "=true",
+ "-x",
+ "tez_local",
+ scriptFile };
+ stats = PigRunner.run(args, listener);
+ assertTrue(stats.isSuccessful());
+ assertEquals(listener.getJobsStarted().size(), 2);
+ }
+
+
+ private static class TestNotificationListener implements PigProgressNotificationListener {
+
+ private Set<String> jobsStarted = new HashSet<String>();
+
+ public void reset() {
+ this.jobsStarted.clear();
+ }
+
+ public Set<String> getJobsStarted() {
+ return jobsStarted;
+ }
+
+ @Override
+ public void initialPlanNotification(String scriptId,
+ OperatorPlan<?> plan) {
+ }
+
+ @Override
+ public void launchStartedNotification(String scriptId,
+ int numJobsToLaunch) {
+ }
+
+ @Override
+ public void jobsSubmittedNotification(String scriptId,
+ int numJobsSubmitted) {
+ }
+
+ @Override
+ public void jobStartedNotification(String scriptId, String assignedJobId) {
+ jobsStarted.add(assignedJobId);
+ }
+
+ @Override
+ public void jobFinishedNotification(String scriptId, JobStats jobStats) {
+ }
+
+ @Override
+ public void jobFailedNotification(String scriptId, JobStats jobStats) {
+ }
+
+ @Override
+ public void outputCompletedNotification(String scriptId,
+ OutputStats outputStats) {
+ }
+
+ @Override
+ public void progressUpdatedNotification(String scriptId, int progress) {
+
+ }
+
+ @Override
+ public void launchCompletedNotification(String scriptId,
+ int numJobsSucceeded) {
+ }
+
+ }
+
}
diff --git a/test/org/apache/pig/tez/TestTezLauncher.java b/test/org/apache/pig/tez/TestTezLauncher.java
index d5aecab..9ffd8f8 100644
--- a/test/org/apache/pig/tez/TestTezLauncher.java
+++ b/test/org/apache/pig/tez/TestTezLauncher.java
@@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
@@ -35,6 +34,7 @@
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -124,11 +124,11 @@
@Test
public void testQueueName() throws Exception {
- Configuration conf = new Configuration();
+ TezConfiguration conf = new TezConfiguration();
conf.set("tez.queue.name", "special");
- conf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ MRToTezHelper.translateMRSettingsForTezAM(conf);
assertEquals(conf.get("tez.queue.name"), "special");
-
+
}
}