APEXCORE-733 implement the new logic using apex.dfsRootDirectory and impersonated user flag
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 96f9daa..da613db 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -446,7 +446,7 @@
// copy required jar files to dfs, to be localized for containers
try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
- Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+ Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
Path appPath;
String configuredAppPath = dag.getValue(LogicalPlan.APPLICATION_PATH);
if (configuredAppPath == null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
index 669a445..a1ac8ca 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
@@ -312,7 +312,7 @@
public String getAppsRoot()
{
- return (defaultStramRoot == null) ? (StramClientUtils.getDTDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot;
+ return (defaultStramRoot == null) ? (StramClientUtils.getApexDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot;
}
public String getAppPath(String appId)
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 97420de..d4f0170 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -273,7 +273,7 @@
if (originalAppId == null) {
throw new AssertionError("Need original app id if launching without apa or appjar");
}
- Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+ Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
Path origAppPath = new Path(appsBasePath, originalAppId);
StringWriter writer = new StringWriter();
try (FSDataInputStream in = fs.open(new Path(origAppPath, "meta.json"))) {
@@ -550,7 +550,7 @@
if (keytab != null) {
Path localKeyTabPath = new Path(keytab);
try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
- Path destPath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), localKeyTabPath.getName());
+ Path destPath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), localKeyTabPath.getName());
if (!fs.exists(destPath)) {
fs.copyFromLocalFile(false, false, localKeyTabPath, destPath);
}
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d9032e5..d8caa1e 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -103,6 +103,7 @@
{
public static final String DT_VERSION = StreamingApplication.DT_PREFIX + "version";
public static final String DT_DFS_ROOT_DIR = StreamingApplication.DT_PREFIX + "dfsRootDirectory";
+ public static final String APEX_APP_DFS_ROOT_DIR = StreamingApplication.APEX_PREFIX + "app.dfsRootDirectory";
public static final String DT_DFS_USER_NAME = "%USER_NAME%";
public static final String DT_CONFIG_STATUS = StreamingApplication.DT_PREFIX + "configStatus";
public static final String SUBDIR_APPS = "apps";
@@ -519,28 +520,94 @@
}
}
+ /**
+ * Helper function used by both getApexDFSRootDir and getDTDFSRootDir to process dfsRootDir
+ *
+ * @param fs FileSystem object for HDFS file system
+ * @param conf Configuration object
+ * @param dfsRootDir value of dt.dfsRootDir or apex.app.dfsRootDir
+ * @param userShortName current user short name (either login user or current user depending on impersonation settings)
+ * @param prependHomeDir prepend user's home dir if dfsRootDir is relative path
+
+ * @return
+ */
+ private static Path evalDFSRootDir(FileSystem fs, Configuration conf, String dfsRootDir, String userShortName,
+ boolean prependHomeDir)
+ {
+ try {
+ if (userShortName != null && dfsRootDir.contains(DT_DFS_USER_NAME)) {
+ dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, userShortName);
+ conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
+ }
+ URI uri = new URI(dfsRootDir);
+ if (uri.isAbsolute()) {
+ return new Path(uri);
+ }
+ if (userShortName != null && prependHomeDir && dfsRootDir.startsWith("/") == false) {
+ dfsRootDir = "/user/" + userShortName + "/" + dfsRootDir;
+ }
+ } catch (URISyntaxException ex) {
+ LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex);
+ }
+ return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir);
+ }
+
+ private static String getDefaultRootFolder()
+ {
+ return "datatorrent";
+ }
+
+ /**
+ * This gets the DFS Root dir to be used at runtime by Apex applications as per the following logic:
+ * Value of apex.app.dfsRootDirectory is referred to as Apex-root-dir below.
+ * A "user" refers to either impersonating or impersonated user:
+ * If apex.application.path.impersonated is true then use impersonated user else impersonating user.
+ *
+ * <ul>
+ * <li> if Apex-root-dir is blank then just call getDTDFSRootDir to get the old behavior
+ * <li> if Apex-root-dir value has %USER_NAME% in the string then replace it with the user's name, else use the absolute path as is.
+ * <li> if Apex-root-dir value is a relative path then append it to the user's home directory.
+ * </ul>
+ *
+ * @param fs FileSystem object for HDFS file system
+ * @param conf Configuration object
+ * @return
+ * @throws IOException
+ */
+ public static Path getApexDFSRootDir(FileSystem fs, Configuration conf)
+ {
+ String apexDfsRootDir = conf.get(APEX_APP_DFS_ROOT_DIR);
+ boolean useImpersonated = conf.getBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+ String userShortName = null;
+ if (useImpersonated) {
+ try {
+ userShortName = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException ex) {
+ LOG.warn("Error getting current/login user name {}", apexDfsRootDir, ex);
+ }
+ }
+ if (!useImpersonated || userShortName == null) {
+ return getDTDFSRootDir(fs, conf);
+ }
+ if (StringUtils.isBlank(apexDfsRootDir)) {
+ apexDfsRootDir = getDefaultRootFolder();
+ }
+ return evalDFSRootDir(fs, conf, apexDfsRootDir, userShortName, true);
+ }
+
public static Path getDTDFSRootDir(FileSystem fs, Configuration conf)
{
String dfsRootDir = conf.get(DT_DFS_ROOT_DIR);
if (StringUtils.isBlank(dfsRootDir)) {
- return new Path(fs.getHomeDirectory(), "datatorrent");
- } else {
- try {
- if (dfsRootDir.contains(DT_DFS_USER_NAME)) {
- dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, UserGroupInformation.getLoginUser().getShortUserName());
- conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
- }
- URI uri = new URI(dfsRootDir);
- if (uri.isAbsolute()) {
- return new Path(uri);
- }
- } catch (IOException ex) {
- LOG.warn("Error getting user login name {}", dfsRootDir, ex);
- } catch (URISyntaxException ex) {
- LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex);
- }
- return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir);
+ return new Path(fs.getHomeDirectory(), getDefaultRootFolder());
}
+ String userShortName = null;
+ try {
+ userShortName = UserGroupInformation.getLoginUser().getShortUserName();
+ } catch (IOException ex) {
+ LOG.warn("Error getting user login name {}", dfsRootDir, ex);
+ }
+ return evalDFSRootDir(fs, conf, dfsRootDir, userShortName, false);
}
public static Path getDTDFSConfigDir(FileSystem fs, Configuration conf)
@@ -828,7 +895,7 @@
List<ApplicationReport> result = new ArrayList<>();
List<ApplicationReport> applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED),
EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.FINISHED, YarnApplicationState.KILLED));
- Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
+ Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
for (ApplicationReport ar : applications) {
long finishTime = ar.getFinishTime();
if (finishTime < finishedBefore) {
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 0c1d0c9..83aa781 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -51,6 +51,7 @@
public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication.";
public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab";
+ public static final String DT_APP_PATH_IMPERSONATED = DT_AUTH_PREFIX + "impersonation.path.enable";
private static String principal;
private static String keytab;
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index 5d4c84b..2069bab 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -60,6 +60,7 @@
File workspace;
File sourceKeytab;
File dfsDir;
+ File apexDfsDir;
static final String principal = "username/group@domain";
@@ -82,6 +83,7 @@
throw new RuntimeException(e);
}
dfsDir = new File(workspace, "dst");
+ apexDfsDir = new File(workspace, "adst");
suppress(method(StramAppLauncher.class, "init"));
}
@@ -138,6 +140,41 @@
new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
}
+ @Test
+ public void testUserLoginTokenRefreshKeytabWithApexDFS() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ /*
+ spy(StramUserLogin.class);
+ when(StramUserLogin.getPrincipal()).thenReturn(principal);
+ when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
+ */
+ StramUserLogin.authenticate(principal, sourceKeytab.getPath());
+ testDFSTokenPathWithApexDFS(conf);
+ }
+
+ @Test
+ public void testAuthPropTokenRefreshKeytabWithApexDFS() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal);
+ conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
+ StramUserLogin.authenticate(conf);
+ testDFSTokenPathWithApexDFS(conf);
+ }
+
+ private void testDFSTokenPathWithApexDFS(Configuration conf) throws Exception
+ {
+ FileSystem fs = FileSystem.newInstance(conf);
+ conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, apexDfsDir.getAbsolutePath());
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); // needs to be true for APEX_APP_DFS_ROOT_DIR to be honored
+ LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
+ Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
+ Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(),
+ new File(apexDfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+ }
+
private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception
{
LogicalPlan dag = new LogicalPlan();
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
index 55b41d0..ed61efb 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
@@ -18,9 +18,11 @@
*/
package com.datatorrent.stram.client;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Properties;
@@ -29,8 +31,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.ConfigUtils;
@@ -154,4 +160,268 @@
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(1)));
}
+ /**
+ * apex.dfsRootDirectory not set: legacy behavior of getDTDFSRootDir()
+ * @throws IOException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirLegacy() throws IOException
+ {
+ Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.DT_DFS_ROOT_DIR, "/a/b/c");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+ FileSystem fs = FileSystem.newInstance(conf);
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/a/b/c", path.toString());
+ }
+
+ /**
+ * apex.dfsRootDirectory set: absolute path e.g. /x/y/z
+ * @throws IOException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirAbsPath() throws IOException
+ {
+ Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/y/z");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+ FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+ }
+
+ /**
+ * apex.dfsRootDirectory set: absolute path with scheme e.g. file:/p/q/r
+ * @throws IOException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirScheme() throws IOException
+ {
+ Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/p/q/r");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+ FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+ }
+
+ /**
+ * apex.dfsRootDirectory set: absolute path with variable %USER_NAME%
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirWithVar() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: absolute path with %USER_NAME% and scheme e.g. file:/x/%USER_NAME%/z
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirWithSchemeAndVar() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/x/%USER_NAME%/z");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/x/impersonated/z", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: relative path
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirRelPath() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: absolute path with %USER_NAME% and impersonation enabled
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirAbsPathAndVar() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/x/impersonated/z", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: relative path and impersonation enabled and doAS
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirRelPathAndImpersonation() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/user/testUser2/apex", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: relative path blank and impersonation enabled and doAS
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirBlankPathAndImpersonation() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/user/testUser2/datatorrent", path.toString());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * apex.dfsRootDirectory set: relative path having %USER_NAME% and impersonation enabled and doAS
+ * Make sure currentUser appears twice
+ * @throws IOException
+ * @throws InterruptedException
+ *
+ */
+ @Test
+ public void getApexDFSRootDirRelPathVarAndImpersonation() throws IOException, InterruptedException
+ {
+ final Configuration conf = new YarnConfiguration(new Configuration(false));
+ conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex/%USER_NAME%/xyz");
+ conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
+
+ final FileSystem fs = FileSystem.newInstance(conf);
+ UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
+ UserGroupInformation.setLoginUser(testUser);
+ UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
+
+ doAsUser.doAs(new PrivilegedExceptionAction<Void>()
+ {
+ @Override
+ public Void run() throws Exception
+ {
+ Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
+ Assert.assertEquals("file:/user/testUser2/apex/testUser2/xyz", path.toString());
+ return null;
+ }
+ });
+ }
}