FALCON-2321 When Prism is Kerberized, it doesn't pass its token while making calls to Falcon Server

Author: Pallavi Nagesha Rao <pallavi.rao@im1738-x3.corp.inmobi.com>

Reviewers: @sandeepSamudrala

Closes #395 from pallavi-rao/FALCON-2321 and squashes the following commits:

f6c3c4352 [Pallavi Nagesha Rao] FALCON-2321 Comment change
8d7d7b06e [Pallavi Nagesha Rao] FALCON-2321 Removing unncessary whitespace
310934caa [Pallavi Nagesha Rao] FALCON-2321 When Prism is Kerberized, it doesn't pass its token while making calls to Falcon Server
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index 187d6c7..401d56b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -22,17 +22,27 @@
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.proxy.BufferedRequest;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.DeploymentProperties;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -49,6 +59,8 @@
 import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Properties;
 
@@ -68,6 +80,18 @@
     private String serviceName;
     private Class service;
 
+    /* Name of the HTTP cookie used for the authentication token between Prism and Falcon server.
+     */
+    private static final String AUTH_COOKIE = "hadoop.auth";
+    private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
+
+    protected static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
+    protected static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
+        public boolean verify(String hostname, SSLSession sslSession) {
+            return true;
+        }
+    };
+
     public void init(String inColo, String inServiceName) throws FalconException {
         this.colo = inColo;
         this.serviceName = inServiceName;
@@ -93,7 +117,7 @@
         try {
             Method method = getMethod(service, methodName, args);
             String urlPrefix = getFalconEndPoint();
-            String url = urlPrefix + "/" + pathValue(method, args);
+            final String url = urlPrefix + "/" + pathValue(method, args);
             LOG.debug("Executing {}", url);
 
             incomingRequest = getIncomingRequest(args);
@@ -101,17 +125,30 @@
             String httpMethod = getHttpMethod(method);
             String mimeType = getConsumes(method);
             String accept = MediaType.WILDCARD;
-            String user = CurrentUser.getUser();
+            final String user = CurrentUser.getUser();
 
             String doAsUser = incomingRequest.getParameter(DO_AS_PARAM);
-
             WebResource resource =  getClient()
                     .resource(UriBuilder.fromUri(url).build().normalize())
                     .queryParam("user.name", user);
             if (doAsUser != null) {
                 resource = resource.queryParam("doAs", doAsUser);
             }
-            ClientResponse response = resource.accept(accept).type(mimeType)
+
+            AuthenticatedURL.Token authenticationToken = null;
+            if (SecurityUtil.isSecurityEnabled()) {
+                UserGroupInformation ugiLoginUser = UserGroupInformation.getCurrentUser();
+                LOG.debug("Security is enabled. Using DoAs : " + ugiLoginUser.getUserName());
+                authenticationToken = ugiLoginUser.doAs(new PrivilegedExceptionAction<AuthenticatedURL.Token>() {
+                    @Override
+                    public AuthenticatedURL.Token run() throws Exception {
+                        return getToken(url + PseudoAuthenticator.USER_NAME + "=" + user, getClient());
+                    }
+                });
+            }
+
+            ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                    .accept(accept).type(mimeType)
                     .method(httpMethod, ClientResponse.class,
                             (isPost(httpMethod) ? incomingRequest.getInputStream() : null));
             incomingRequest.getInputStream().reset();
@@ -242,4 +279,28 @@
         }
         return consumes.value()[0];
     }
+
+    protected AuthenticatedURL.Token getToken(String baseUrl, Client client) throws FalconException {
+        AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token();
+        try {
+            URL url = new URL(baseUrl);
+            // using KerberosAuthenticator which falls back to PsuedoAuthenticator
+            // instead of passing authentication type from the command line - bad factory
+            HTTPSProperties httpsProperties = ((HTTPSProperties)
+                    client.getProperties().get(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES));
+            SSLContext sslContext = null;
+            if (httpsProperties != null) {
+                sslContext = httpsProperties.getSSLContext();
+            }
+            if (sslContext != null) {
+                HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory());
+                HttpsURLConnection.setDefaultHostnameVerifier(ALL_TRUSTING_HOSTNAME_VERIFIER);
+            }
+            new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken);
+        } catch (Exception ex) {
+            throw new FalconException("Could not authenticate, " + ex.getMessage(), ex);
+        }
+
+        return currentToken;
+    }
 }
diff --git a/prism/src/main/java/org/apache/falcon/util/Servlets.java b/prism/src/main/java/org/apache/falcon/util/Servlets.java
index 664ad4c..8c42fab 100644
--- a/prism/src/main/java/org/apache/falcon/util/Servlets.java
+++ b/prism/src/main/java/org/apache/falcon/util/Servlets.java
@@ -40,12 +40,12 @@
      * @return the user
      */
     public static String getUserFromRequest(HttpServletRequest httpRequest) {
-        String user = httpRequest.getRemoteUser();
+        String user = httpRequest.getParameter("user.name"); // available in query-param
         if (!StringUtils.isEmpty(user)) {
             return user;
         }
 
-        user = httpRequest.getParameter("user.name"); // available in query-param
+        user = httpRequest.getRemoteUser();
         if (!StringUtils.isEmpty(user)) {
             return user;
         }