TEZ-1348. Allow Tez local mode to run against filesystems other than
local FS. (Todd Lipcon via sseth)
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 5d7aea3..adcae8a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,20 +71,10 @@
public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
- FileInputStream confPBBinaryStream = null;
- ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
- try {
- confPBBinaryStream =
- new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
- confProtoBuilder.mergeFrom(confPBBinaryStream);
- } finally {
- if (confPBBinaryStream != null) {
- confPBBinaryStream.close();
- }
+ File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
+ try (FileInputStream fis = new FileInputStream(confPBFile)) {
+ return ConfigurationProto.parseFrom(fis);
}
-
- ConfigurationProto confProto = confProtoBuilder.build();
- return confProto;
}
public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -95,31 +85,6 @@
}
}
}
-//
-// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
-// IOException {
-// FileInputStream confPBBinaryStream = null;
-// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-// try {
-// confPBBinaryStream =
-// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-// confProtoBuilder.mergeFrom(confPBBinaryStream);
-// } finally {
-// if (confPBBinaryStream != null) {
-// confPBBinaryStream.close();
-// }
-// }
-//
-// ConfigurationProto confProto = confProtoBuilder.build();
-//
-// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-// if (kvPairList != null && !kvPairList.isEmpty()) {
-// for (PlanKeyValuePair kvPair : kvPairList) {
-// conf.set(kvPair.getKey(), kvPair.getValue());
-// }
-// }
-// }
-
public static byte[] compressBytes(byte[] inBytes) throws IOException {
StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 6baea48..140ada1 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,6 +24,8 @@
import java.nio.ByteBuffer;
import java.util.List;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -83,7 +85,6 @@
@Override
public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
this.conf = tezConf;
- tezConf.set("fs.defaultFS", "file:///");
// Tez libs already in the client's classpath
this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -286,19 +287,34 @@
try {
ApplicationId appId = appContext.getApplicationId();
- // Set up working directory for DAGAppMaster
+ // Set up working directory for DAGAppMaster.
+ // The staging directory may be on the default file system, which may or may not
+ // be the local FS. For example, when using testing Hive against a pseudo-distributed
+ // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
+ // directories on HDFS, and sets the Tez staging directory to be the session's
+ // scratch directory.
+ //
+ // To handle this case, we need to copy over the staging data back onto the
+ // local file system, where the rest of the Tez Child code expects it.
+ //
+ // NOTE: we base the local working directory path off of the staging path, even
+ // though it might be on a different file system. Typically they're both in a
+ // path starting with /tmp, but in the future we may want to use a different
+ // temp directory locally.
Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
- Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+ FileSystem stagingFs = staging.getFileSystem(conf);
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
LOG.info("Using working directory: " + userDir.toUri().getPath());
- FileSystem fs = FileSystem.get(conf);
// copy data from staging directory to working directory to simulate the resource localizing
- FileUtil.copy(fs, staging, fs, userDir, false, conf);
+ FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
// Prepare Environment
Path logDir = new Path(userDir, "localmode-log-dir");
Path localDir = new Path(userDir, "localmode-local-dir");
- fs.mkdirs(logDir);
- fs.mkdirs(localDir);
+ localFs.mkdirs(logDir);
+ localFs.mkdirs(localDir);
UserGroupInformation.setConfiguration(conf);
// Add session specific credentials to the AM credentials.
@@ -357,30 +373,11 @@
// Read in additional information about external services
AMPluginDescriptorProto amPluginDescriptorProto =
- getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
-
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
+ .getAmPluginDescriptor();
return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
}
-
- private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
- String applicationIdString) throws
- IOException {
- Path tezSysStagingPath = TezCommonUtils
- .getTezSystemStagingPath(conf, applicationIdString);
- // Remove the filesystem qualifier.
- String unqualifiedPath = tezSysStagingPath.toUri().getPath();
-
- DAGProtos.ConfigurationProto confProto =
- TezUtilsInternal
- .readUserSpecifiedTezConfiguration(unqualifiedPath);
- AMPluginDescriptorProto amPluginDescriptorProto = null;
- if (confProto.hasAmPluginDescriptor()) {
- amPluginDescriptorProto = confProto.getAmPluginDescriptor();
- }
- return amPluginDescriptorProto;
- }
-
}
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 6b626b1..cb52105 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,8 +276,7 @@
protected void printExtraOptionsUsage(PrintStream ps) {
ps.println("Tez example extra options supported are");
- // TODO TEZ-1348 make it able to access dfs in tez local mode
- ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+ ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
+ " run it in distributed mode without this option");
ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
+ " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 2a5b65f..318349c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,12 +20,16 @@
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -43,23 +47,73 @@
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.junit.Assert.*;
+@RunWith(Parameterized.class)
public class TestLocalMode {
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
+ private static MiniDFSCluster dfsCluster;
+ private static FileSystem remoteFs;
+
+ @Parameterized.Parameter
+ public boolean useDfs;
+
+ @Parameterized.Parameters(name = "useDFS:{0}")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[][]{{ false }, { true }});
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ try {
+ Configuration conf = new Configuration();
+ dfsCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+ .racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws InterruptedException {
+ if (dfsCluster != null) {
+ try {
+ dfsCluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private TezConfiguration createConf() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ if (useDfs) {
+ conf.set("fs.defaultFS", remoteFs.getUri().toString());
+ } else {
+ conf.set("fs.defaultFS", "file:///");
+ }
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ return conf;
+ }
+
@Test(timeout = 30000)
public void testMultipleClientsWithSession() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = new TezConfiguration();
- tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf1.set("fs.defaultFS", "file:///");
- tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf1 = createConf();
TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
tezClient1.start();
@@ -72,11 +126,7 @@
dagClient1.close();
tezClient1.stop();
-
- TezConfiguration tezConf2 = new TezConfiguration();
- tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf2.set("fs.defaultFS", "file:///");
- tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf2 = createConf();
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
tezClient2.start();
@@ -91,10 +141,7 @@
@Test(timeout = 10000)
public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = new TezConfiguration();
- tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf1.set("fs.defaultFS", "file:///");
- tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf1 = createConf();
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -108,10 +155,7 @@
tezClient1.stop();
- TezConfiguration tezConf2 = new TezConfiguration();
- tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf2.set("fs.defaultFS", "file:///");
- tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf2 = createConf();
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
tezClient2.start();
@@ -126,10 +170,7 @@
@Test(timeout = 20000)
public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = new TezConfiguration();
- tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf1.set("fs.defaultFS", "file:///");
- tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf1 = createConf();
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -150,10 +191,7 @@
@Test(timeout = 20000)
public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = new TezConfiguration();
- tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf1.set("fs.defaultFS", "file:///");
- tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf1 = createConf();
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -211,10 +249,7 @@
String[] outputPaths = new String[dags];
DAGClient[] dagClients = new DAGClient[dags];
- TezConfiguration tezConf = new TezConfiguration();
- tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- tezConf.set("fs.defaultFS", "file:///");
- tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ TezConfiguration tezConf = createConf();
TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
tezClient.start();