diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 5d7aea3..adcae8a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,20 +71,10 @@
 
   public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
-    FileInputStream confPBBinaryStream = null;
-    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-    try {
-      confPBBinaryStream =
-          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      confProtoBuilder.mergeFrom(confPBBinaryStream);
-    } finally {
-      if (confPBBinaryStream != null) {
-        confPBBinaryStream.close();
-      }
+    File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
+    try (FileInputStream fis = new FileInputStream(confPBFile)) {
+      return ConfigurationProto.parseFrom(fis);
     }
-
-    ConfigurationProto confProto = confProtoBuilder.build();
-    return confProto;
   }
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -95,31 +85,6 @@
       }
     }
   }
-//
-//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
-//      IOException {
-//    FileInputStream confPBBinaryStream = null;
-//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-//    try {
-//      confPBBinaryStream =
-//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-//      confProtoBuilder.mergeFrom(confPBBinaryStream);
-//    } finally {
-//      if (confPBBinaryStream != null) {
-//        confPBBinaryStream.close();
-//      }
-//    }
-//
-//    ConfigurationProto confProto = confProtoBuilder.build();
-//
-//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-//    if (kvPairList != null && !kvPairList.isEmpty()) {
-//      for (PlanKeyValuePair kvPair : kvPairList) {
-//        conf.set(kvPair.getKey(), kvPair.getValue());
-//      }
-//    }
-//  }
-
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
     StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 6baea48..140ada1 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,6 +24,8 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -83,7 +85,6 @@
   @Override
   public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
     this.conf = tezConf;
-    tezConf.set("fs.defaultFS", "file:///");
     // Tez libs already in the client's classpath
     this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -286,19 +287,34 @@
         try {
           ApplicationId appId = appContext.getApplicationId();
 
-          // Set up working directory for DAGAppMaster
+          // Set up working directory for DAGAppMaster.
+          // The staging directory may be on the default file system, which may or may not
+          // be the local FS. For example, when using testing Hive against a pseudo-distributed
+          // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
+          // directories on HDFS, and sets the Tez staging directory to be the session's
+          // scratch directory.
+          //
+          // To handle this case, we need to copy over the staging data back onto the
+          // local file system, where the rest of the Tez Child code expects it.
+          //
+          // NOTE: we base the local working directory path off of the staging path, even
+          // though it might be on a different file system. Typically they're both in a
+          // path starting with /tmp, but in the future we may want to use a different
+          // temp directory locally.
           Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+          FileSystem stagingFs = staging.getFileSystem(conf);
+
+          FileSystem localFs = FileSystem.getLocal(conf);
+          Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
           LOG.info("Using working directory: " + userDir.toUri().getPath());
 
-          FileSystem fs = FileSystem.get(conf);
           // copy data from staging directory to working directory to simulate the resource localizing
-          FileUtil.copy(fs, staging, fs, userDir, false, conf);
+          FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
-          fs.mkdirs(logDir);
-          fs.mkdirs(localDir);
+          localFs.mkdirs(logDir);
+          localFs.mkdirs(localDir);
 
           UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
@@ -357,30 +373,11 @@
 
     // Read in additional information about external services
     AMPluginDescriptorProto amPluginDescriptorProto =
-        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
-
+        TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
+            .getAmPluginDescriptor();
 
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
         versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
   }
-
-  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
-                                                          String applicationIdString) throws
-      IOException {
-    Path tezSysStagingPath = TezCommonUtils
-        .getTezSystemStagingPath(conf, applicationIdString);
-    // Remove the filesystem qualifier.
-    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
-
-    DAGProtos.ConfigurationProto confProto =
-        TezUtilsInternal
-            .readUserSpecifiedTezConfiguration(unqualifiedPath);
-    AMPluginDescriptorProto amPluginDescriptorProto = null;
-    if (confProto.hasAmPluginDescriptor()) {
-      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
-    }
-    return amPluginDescriptorProto;
-  }
-
 }
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 6b626b1..cb52105 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,8 +276,7 @@
 
   protected void printExtraOptionsUsage(PrintStream ps) {
     ps.println("Tez example extra options supported are");
-    // TODO TEZ-1348 make it able to access dfs in tez local mode
-    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
         + " run it in distributed mode without this option");
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 2a5b65f..318349c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,12 +20,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -43,23 +47,73 @@
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.*;
 
+@RunWith(Parameterized.class)
 public class TestLocalMode {
 
   private static final File TEST_DIR = new File(
       System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
 
+  private static MiniDFSCluster dfsCluster;
+  private static FileSystem remoteFs;
+
+  @Parameterized.Parameter
+  public boolean useDfs;
+
+  @Parameterized.Parameters(name = "useDFS:{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{{ false }, { true }});
+  }
+
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    try {
+      Configuration conf = new Configuration();
+      dfsCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+              .racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (dfsCluster != null) {
+      try {
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private TezConfiguration createConf() {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    if (useDfs) {
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());
+    } else {
+      conf.set("fs.defaultFS", "file:///");
+    }
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    return conf;
+  }
+
   @Test(timeout = 30000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
     tezClient1.start();
 
@@ -72,11 +126,7 @@
     dagClient1.close();
     tezClient1.stop();
 
-
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
     tezClient2.start();
@@ -91,10 +141,7 @@
   @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
 
@@ -108,10 +155,7 @@
     tezClient1.stop();
 
 
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
     tezClient2.start();
@@ -126,10 +170,7 @@
   @Test(timeout = 20000)
   public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -150,10 +191,7 @@
   @Test(timeout = 20000)
   public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -211,10 +249,7 @@
     String[] outputPaths =  new String[dags];
     DAGClient[] dagClients = new DAGClient[dags];
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf.set("fs.defaultFS", "file:///");
-    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf = createConf();
     TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
     tezClient.start();
 
