APEXCORE-733 implement the new logic using apex.dfsRootDirectory and impersonated user flag
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 96f9daa..da613db 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -446,7 +446,7 @@
 
     // copy required jar files to dfs, to be localized for containers
     try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
-      Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+      Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
       Path appPath;
       String configuredAppPath = dag.getValue(LogicalPlan.APPLICATION_PATH);
       if (configuredAppPath == null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
index 669a445..a1ac8ca 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
@@ -312,7 +312,7 @@
 
   public String getAppsRoot()
   {
-    return (defaultStramRoot == null) ? (StramClientUtils.getDTDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot;
+    return (defaultStramRoot == null) ? (StramClientUtils.getApexDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot;
   }
 
   public String getAppPath(String appId)
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 97420de..d4f0170 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -273,7 +273,7 @@
     if (originalAppId == null) {
       throw new AssertionError("Need original app id if launching without apa or appjar");
     }
-    Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+    Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
     Path origAppPath = new Path(appsBasePath, originalAppId);
     StringWriter writer = new StringWriter();
     try (FSDataInputStream in = fs.open(new Path(origAppPath, "meta.json"))) {
@@ -550,7 +550,7 @@
       if (keytab != null) {
         Path localKeyTabPath = new Path(keytab);
         try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
-          Path destPath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), localKeyTabPath.getName());
+          Path destPath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), localKeyTabPath.getName());
           if (!fs.exists(destPath)) {
             fs.copyFromLocalFile(false, false, localKeyTabPath, destPath);
           }
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d9032e5..d8caa1e 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -103,6 +103,7 @@
 {
   public static final String DT_VERSION = StreamingApplication.DT_PREFIX + "version";
   public static final String DT_DFS_ROOT_DIR = StreamingApplication.DT_PREFIX + "dfsRootDirectory";
+  public static final String APEX_APP_DFS_ROOT_DIR = StreamingApplication.APEX_PREFIX + "app.dfsRootDirectory";
   public static final String DT_DFS_USER_NAME = "%USER_NAME%";
   public static final String DT_CONFIG_STATUS = StreamingApplication.DT_PREFIX + "configStatus";
   public static final String SUBDIR_APPS = "apps";
@@ -519,28 +520,94 @@
     }
   }
 
+  /**
+   * Helper function used by both getApexDFSRootDir and getDTDFSRootDir to process dfsRootDir
+   *
+   * @param fs   FileSystem object for HDFS file system
+   * @param conf  Configuration object
+   * @param dfsRootDir  value of dt.dfsRootDir or apex.app.dfsRootDir
+   * @param userShortName  current user short name (either login user or current user depending on impersonation settings)
+   * @param prependHomeDir  prepend user's home dir if dfsRootDir is relative path
+
+   * @return
+   */
+  private static Path evalDFSRootDir(FileSystem fs, Configuration conf, String dfsRootDir, String userShortName,
+      boolean prependHomeDir)
+  {
+    try {
+      if (userShortName != null && dfsRootDir.contains(DT_DFS_USER_NAME)) {
+        dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, userShortName);
+        conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
+      }
+      URI uri = new URI(dfsRootDir);
+      if (uri.isAbsolute()) {
+        return new Path(uri);
+      }
+      if (userShortName != null && prependHomeDir && dfsRootDir.startsWith("/") == false) {
+        dfsRootDir = "/user/" + userShortName + "/" + dfsRootDir;
+      }
+    } catch (URISyntaxException ex) {
+      LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex);
+    }
+    return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir);
+  }
+
+  private static String getDefaultRootFolder()
+  {
+    return "datatorrent";
+  }
+
+  /**
+   * This gets the DFS Root dir to be used at runtime by Apex applications as per the following logic:
+   * Value of apex.app.dfsRootDirectory is referred to as Apex-root-dir below.
+   * A "user" refers to either impersonating or impersonated user:
+   *    If apex.application.path.impersonated is true then use impersonated user else impersonating user.
+   *
+   * <ul>
+   * <li> if Apex-root-dir is blank then just call getDTDFSRootDir to get the old behavior
+   * <li> if Apex-root-dir value has %USER_NAME% in the string then replace it with the user's name, else use the absolute path as is.
+   * <li> if Apex-root-dir value is a relative path then append it to the user's home directory.
+   * </ul>
+   *
+   * @param fs FileSystem object for HDFS file system
+   * @param conf  Configuration object
+   * @return
+   * @throws IOException
+   */
+  public static Path getApexDFSRootDir(FileSystem fs, Configuration conf)
+  {
+    String apexDfsRootDir = conf.get(APEX_APP_DFS_ROOT_DIR);
+    boolean useImpersonated = conf.getBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+    String userShortName = null;
+    if (useImpersonated) {
+      try {
+        userShortName = UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException ex) {
+        LOG.warn("Error getting current/login user name {}", apexDfsRootDir, ex);
+      }
+    }
+    if (!useImpersonated || userShortName == null) {
+      return getDTDFSRootDir(fs, conf);
+    }
+    if (StringUtils.isBlank(apexDfsRootDir)) {
+      apexDfsRootDir = getDefaultRootFolder();
+    }
+    return evalDFSRootDir(fs, conf, apexDfsRootDir, userShortName, true);
+  }
+
   public static Path getDTDFSRootDir(FileSystem fs, Configuration conf)
   {
     String dfsRootDir = conf.get(DT_DFS_ROOT_DIR);
     if (StringUtils.isBlank(dfsRootDir)) {
-      return new Path(fs.getHomeDirectory(), "datatorrent");
-    } else {
-      try {
-        if (dfsRootDir.contains(DT_DFS_USER_NAME)) {
-          dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, UserGroupInformation.getLoginUser().getShortUserName());
-          conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
-        }
-        URI uri = new URI(dfsRootDir);
-        if (uri.isAbsolute()) {
-          return new Path(uri);
-        }
-      } catch (IOException ex) {
-        LOG.warn("Error getting user login name {}", dfsRootDir, ex);
-      } catch (URISyntaxException ex) {
-        LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex);
-      }
-      return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir);
+      return new Path(fs.getHomeDirectory(), getDefaultRootFolder());
     }
+    String userShortName = null;
+    try {
+      userShortName = UserGroupInformation.getLoginUser().getShortUserName();
+    } catch (IOException ex) {
+      LOG.warn("Error getting user login name {}", dfsRootDir, ex);
+    }
+    return evalDFSRootDir(fs, conf, dfsRootDir, userShortName, false);
   }
 
   public static Path getDTDFSConfigDir(FileSystem fs, Configuration conf)
@@ -828,7 +895,7 @@
     List<ApplicationReport> result = new ArrayList<>();
     List<ApplicationReport> applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED),
         EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.FINISHED, YarnApplicationState.KILLED));
-    Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+    Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
     for (ApplicationReport ar : applications) {
       long finishTime = ar.getFinishTime();
       if (finishTime < finishedBefore) {
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 0c1d0c9..83aa781 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -51,6 +51,7 @@
   public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication.";
   public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
   public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab";
+  public static final String DT_APP_PATH_IMPERSONATED = DT_AUTH_PREFIX + "impersonation.path.enable";
   private static String principal;
   private static String keytab;
 
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index 5d4c84b..2069bab 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -60,6 +60,7 @@
     File workspace;
     File sourceKeytab;
     File dfsDir;
+    File apexDfsDir;
 
     static final String principal = "username/group@domain";
 
@@ -82,6 +83,7 @@
           throw new RuntimeException(e);
         }
         dfsDir = new File(workspace, "dst");
+        apexDfsDir = new File(workspace, "adst");
         suppress(method(StramAppLauncher.class, "init"));
       }
 
@@ -138,6 +140,41 @@
           new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
     }
 
+    @Test
+    public void testUserLoginTokenRefreshKeytabWithApexDFS() throws Exception
+    {
+      Configuration conf = new Configuration(false);
+      /*
+      spy(StramUserLogin.class);
+      when(StramUserLogin.getPrincipal()).thenReturn(principal);
+      when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
+      */
+      StramUserLogin.authenticate(principal, sourceKeytab.getPath());
+      testDFSTokenPathWithApexDFS(conf);
+    }
+
+    @Test
+    public void testAuthPropTokenRefreshKeytabWithApexDFS() throws Exception
+    {
+      Configuration conf = new Configuration(false);
+      conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal);
+      conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
+      StramUserLogin.authenticate(conf);
+      testDFSTokenPathWithApexDFS(conf);
+    }
+
+    private void testDFSTokenPathWithApexDFS(Configuration conf) throws Exception
+    {
+      FileSystem fs = FileSystem.newInstance(conf);
+      conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
+      conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, apexDfsDir.getAbsolutePath());
+      conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); // needs to be true for APEX_APP_DFS_ROOT_DIR to be honored
+      LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
+      Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
+      Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(),
+          new File(apexDfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+    }
+
     private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception
     {
       LogicalPlan dag = new LogicalPlan();
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
index 55b41d0..ed61efb 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
@@ -18,9 +18,11 @@
  */
 package com.datatorrent.stram.client;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Properties;
 
@@ -29,8 +31,12 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.util.ConfigUtils;
 
 
@@ -154,4 +160,268 @@
     Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(1)));
   }
 
+  /**
+   * apex.dfsRootDirectory not set: legacy behavior of getDTDFSRootDir()
+   * @throws IOException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirLegacy() throws IOException
+  {
+    Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.DT_DFS_ROOT_DIR, "/a/b/c");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+    FileSystem fs = FileSystem.newInstance(conf);
+    Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+    Assert.assertEquals("file:/a/b/c", path.toString());
+  }
+
+  /**
+   * apex.dfsRootDirectory set: absolute path e.g. /x/y/z
+   * @throws IOException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirAbsPath() throws IOException
+  {
+    Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/y/z");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+    FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+    Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+  }
+
+  /**
+   * apex.dfsRootDirectory set: absolute path with scheme e.g. file:/p/q/r
+   * @throws IOException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirScheme() throws IOException
+  {
+    Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/p/q/r");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+    FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+    Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+  }
+
+  /**
+   * apex.dfsRootDirectory set: absolute path with variable %USER_NAME%
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirWithVar() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: absolute path with %USER_NAME% and scheme e.g. file:/x/%USER_NAME%/z
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirWithSchemeAndVar() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/x/%USER_NAME%/z");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals("file:/x/impersonated/z", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: relative path
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirRelPath() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: absolute path with %USER_NAME% and impersonation enabled
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirAbsPathAndVar() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals("file:/x/impersonated/z", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: relative path and impersonation enabled and doAS
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirRelPathAndImpersonation() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals("file:/user/testUser2/apex", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: relative path blank and impersonation enabled and doAS
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirBlankPathAndImpersonation() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals("file:/user/testUser2/datatorrent", path.toString());
+        return null;
+      }
+    });
+  }
+
+  /**
+   * apex.dfsRootDirectory set: relative path having %USER_NAME% and impersonation enabled and doAS
+   * Make sure currentUser appears twice
+   * @throws IOException
+   * @throws InterruptedException
+   *
+   */
+  @Test
+  public void getApexDFSRootDirRelPathVarAndImpersonation() throws IOException, InterruptedException
+  {
+    final Configuration conf = new YarnConfiguration(new Configuration(false));
+    conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex/%USER_NAME%/xyz");
+    conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+    final FileSystem fs = FileSystem.newInstance(conf);
+    UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+    UserGroupInformation.setLoginUser(testUser);
+    UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+    doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+    {
+      @Override
+      public Void run() throws Exception
+      {
+        Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+        Assert.assertEquals("file:/user/testUser2/apex/testUser2/xyz", path.toString());
+        return null;
+      }
+    });
+  }
 }