APEXCORE-726 Setup up acls for the impersonating user to access the application when the application is launched with impersonation
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 76c1407..b90cdab 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.security.ACLManager;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.hadoop.fs.FileStatus;
@@ -38,6 +40,7 @@
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -150,6 +153,28 @@
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
setClasspath(containerEnv);
+
+ // Setup ACLs for the impersonating user
+ try {
+ String launchPrincipal = System.getenv("HADOOP_USER_NAME");
+ LOG.debug("Launch principal {}", launchPrincipal);
+ if (launchPrincipal != null) {
+ String launchUserName = launchPrincipal;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ try {
+ launchUserName = new HadoopKerberosName(launchPrincipal).getShortName();
+ } catch (Exception ex) {
+ LOG.warn("Error resolving kerberos principal {}", launchPrincipal, ex);
+ }
+ }
+ LOG.debug("ACL launch user {} current user {}", launchUserName, UserGroupInformation.getCurrentUser().getShortUserName());
+ if (!UserGroupInformation.getCurrentUser().getShortUserName().equals(launchUserName)) {
+ ACLManager.setupUserACLs(ctx, launchUserName, nmClient.getConfig());
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to setup user acls for container {}", container.getId(), e);
+ }
try {
// propagate to replace node managers user name (effective in non-secure mode)
containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index b280aad..f7d3da9 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;
import org.apache.apex.common.util.JarHelper;
+import org.apache.apex.engine.security.ACLManager;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
@@ -431,6 +432,12 @@
amContainer.setTokens(fsTokens);
}
+ // Setup ACLs for the impersonating user
+ LOG.debug("ACL login user {} current user {}", UserGroupInformation.getLoginUser(), UserGroupInformation.getCurrentUser());
+ if (!UserGroupInformation.getCurrentUser().equals(UserGroupInformation.getLoginUser())) {
+ ACLManager.setupUserACLs(amContainer, UserGroupInformation.getLoginUser().getShortUserName(), conf);
+ }
+
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
@@ -546,6 +553,7 @@
}
env.put("CLASSPATH", classPathEnv.toString());
// propagate to replace node managers user name (effective in non-secure mode)
+ // also to indicate original login user during impersonation and important for setting ACLs
env.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
amContainer.setEnvironment(env);
diff --git a/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java b/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java
new file mode 100644
index 0000000..9db0251
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.security;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+public class ACLManager
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(ACLManager.class);
+
+ public static void setupUserACLs(ContainerLaunchContext launchContext, String userName, Configuration conf) throws IOException
+ {
+ logger.debug("Setup login acls {}", userName);
+ if (areACLsRequired(conf)) {
+ logger.debug("Configuring ACLs for {}", userName);
+ Map<ApplicationAccessType, String> acls = Maps.newHashMap();
+ acls.put(ApplicationAccessType.VIEW_APP, userName);
+ acls.put(ApplicationAccessType.MODIFY_APP, userName);
+ launchContext.setApplicationACLs(acls);
+ }
+ }
+
+ public static boolean areACLsRequired(Configuration conf)
+ {
+ logger.debug("Check ACLs required");
+ if (conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
+ logger.debug("Admin ACL {}", conf.get(YarnConfiguration.YARN_ADMIN_ACL));
+ if (!YarnConfiguration.DEFAULT_YARN_ADMIN_ACL.equals(conf.get(YarnConfiguration.YARN_ADMIN_ACL))) {
+ logger.debug("Non default admin ACL");
+ return true;
+ }
+ }
+ return false;
+ }
+
+}