Merge branch 'APEXCORE-726' of github.com:PramodSSImmaneni/apex-core
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 437c2fd..598cdba 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -30,6 +30,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.security.ACLManager;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.text.StrSubstitutor;
 import org.apache.hadoop.fs.FileStatus;
@@ -38,6 +40,7 @@
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -150,6 +153,28 @@
     ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
 
     setClasspath(containerEnv);
+
+    // Setup ACLs for the impersonating user
+    try {
+      String launchPrincipal = System.getenv("HADOOP_USER_NAME");
+      LOG.debug("Launch principal {}", launchPrincipal);
+      if (launchPrincipal != null) {
+        String launchUserName = launchPrincipal;
+        if (UserGroupInformation.isSecurityEnabled()) {
+          try {
+            launchUserName = new HadoopKerberosName(launchPrincipal).getShortName();
+          } catch (Exception ex) {
+            LOG.warn("Error resolving kerberos principal {}", launchPrincipal, ex);
+          }
+        }
+        LOG.debug("ACL launch user {} current user {}", launchUserName, UserGroupInformation.getCurrentUser().getShortUserName());
+        if (!UserGroupInformation.getCurrentUser().getShortUserName().equals(launchUserName)) {
+          ACLManager.setupUserACLs(ctx, launchUserName, nmClient.getConfig());
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to setup user acls for container {}", container.getId(), e);
+    }
     try {
       // propagate to replace node managers user name (effective in non-secure mode)
       containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index f8a3194..c5017eb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -37,6 +37,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.common.util.JarHelper;
+import org.apache.apex.engine.security.ACLManager;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -431,6 +432,12 @@
       amContainer.setTokens(fsTokens);
     }
 
+    // Setup ACLs for the impersonating user
+    LOG.debug("ACL login user {} current user {}", UserGroupInformation.getLoginUser(), UserGroupInformation.getCurrentUser());
+    if (!UserGroupInformation.getCurrentUser().equals(UserGroupInformation.getLoginUser())) {
+      ACLManager.setupUserACLs(amContainer, UserGroupInformation.getLoginUser().getShortUserName(), conf);
+    }
+
     // set local resources for the application master
     // local files or archives as needed
     // In this scenario, the jar file for the application master is part of the local resources
@@ -546,6 +553,7 @@
       }
       env.put("CLASSPATH", classPathEnv.toString());
       // propagate to replace node managers user name (effective in non-secure mode)
+      // also to indicate original login user during impersonation and important for setting ACLs
       env.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
 
       amContainer.setEnvironment(env);
diff --git a/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java b/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java
new file mode 100644
index 0000000..9db0251
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/security/ACLManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.security;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+public class ACLManager
+{
+
+  private static final Logger logger = LoggerFactory.getLogger(ACLManager.class);
+
+  public static void setupUserACLs(ContainerLaunchContext launchContext, String userName, Configuration conf) throws IOException
+  {
+    logger.debug("Setup login acls {}", userName);
+    if (areACLsRequired(conf)) {
+      logger.debug("Configuring ACLs for {}", userName);
+      Map<ApplicationAccessType, String> acls = Maps.newHashMap();
+      acls.put(ApplicationAccessType.VIEW_APP, userName);
+      acls.put(ApplicationAccessType.MODIFY_APP, userName);
+      launchContext.setApplicationACLs(acls);
+    }
+  }
+
+  public static boolean areACLsRequired(Configuration conf)
+  {
+    logger.debug("Check ACLs required");
+    if (conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
+      logger.debug("Admin ACL {}", conf.get(YarnConfiguration.YARN_ADMIN_ACL));
+      if (!YarnConfiguration.DEFAULT_YARN_ADMIN_ACL.equals(conf.get(YarnConfiguration.YARN_ADMIN_ACL))) {
+        logger.debug("Non default admin ACL");
+        return true;
+      }
+    }
+    return false;
+  }
+
+}