AMBARI-19369. Add Kerberos HTTP SPNEGO authentication support to Hadoop/hbase/kafka/storm sinks (Qin Liu via rlevas)
diff --git a/ambari-metrics-common/pom.xml b/ambari-metrics-common/pom.xml
index 62ae75f..f0d3963 100644
--- a/ambari-metrics-common/pom.xml
+++ b/ambari-metrics-common/pom.xml
@@ -189,5 +189,10 @@
       <artifactId>powermock-module-junit4</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.2.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index a8dc571..fddf4b3 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
+import org.apache.http.HttpStatus;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -83,6 +84,9 @@
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
   public static final String INSTANCE_ID_PROPERTY = "instanceId";
   public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
+  public static final String COOKIE = "Cookie";
+  private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+  private static final String NEGOTIATE = "Negotiate";
 
   protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
@@ -97,6 +101,7 @@
   private long lastFailedZkRequestTime = 0l;
 
   private SSLSocketFactory sslSocketFactory;
+  private AppCookieManager appCookieManager = null;
 
   protected final Log LOG;
 
@@ -157,6 +162,18 @@
       connection = connectUrl.startsWith("https") ?
           getSSLConnection(connectUrl) : getConnection(connectUrl);
 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("emitMetricsJson to " + connectUrl + ", " + jsonData);
+      }
+      AppCookieManager appCookieManager = getAppCookieManager();
+      String appCookie = appCookieManager.getCachedAppCookie(connectUrl);
+      if (appCookie != null) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Using cached app cookie for URL:" + connectUrl);
+        }
+        connection.setRequestProperty(COOKIE, appCookie);
+      }
+
       connection.setRequestMethod("POST");
       connection.setRequestProperty("Content-Type", "application/json");
       connection.setRequestProperty("Connection", "Keep-Alive");
@@ -171,6 +188,37 @@
       }
 
       int statusCode = connection.getResponseCode();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("emitMetricsJson: statusCode = " + statusCode);
+      }
+
+      if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
+        String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + connectUrl);
+        }
+        if (wwwAuthHeader != null && wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
+          appCookie = appCookieManager.getAppCookie(connectUrl, true);
+          if (appCookie != null) {
+            connection.setRequestProperty(COOKIE, appCookie);
+
+            if (jsonData != null) {
+              try (OutputStream os = connection.getOutputStream()) {
+                os.write(jsonData.getBytes("UTF-8"));
+              }
+            }
+
+            statusCode = connection.getResponseCode();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("emitMetricsJson: statusCode2 = " + statusCode);
+            }
+          }
+        } else {
+          // no supported authentication type found
+          // we would let the original response propagate
+          LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + connectUrl);
+        }
+      }
 
       if (statusCode != 200) {
         LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
@@ -265,6 +313,18 @@
   }
 
   /**
+   * Get the associated app cookie manager.
+   *
+   * @return the app cookie manager
+   */
+  public synchronized AppCookieManager getAppCookieManager() {
+    if (appCookieManager == null) {
+      appCookieManager = new AppCookieManager();
+    }
+    return appCookieManager;
+  }
+
+  /**
    * Cleans up and closes an input stream
    * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
    * @param is the InputStream to clean up
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
new file mode 100644
index 0000000..bcba238
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Handles SPNego authentication as a client of hadoop service, caches
+ * hadoop.auth cookie returned by hadoop service on successful SPNego
+ * authentication. Refreshes hadoop.auth cookie on demand if the cookie has
+ * expired.
+ *
+ */
+public class AppCookieManager {
+
+  static final String HADOOP_AUTH = "hadoop.auth";
+  private static final String HADOOP_AUTH_EQ = "hadoop.auth=";
+  private static final String SET_COOKIE = "Set-Cookie";
+
+  private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+
+  private Map<String, String> endpointCookieMap = new ConcurrentHashMap<String, String>();
+  private static Log LOG = LogFactory.getLog(AppCookieManager.class);
+
+  /**
+   * Utility method to exercise AppCookieManager directly
+   * @param args element 0 of args should be a URL to hadoop service protected by SPengo
+   * @throws IOException in case of errors
+   */
+  public static void main(String[] args) throws IOException {
+    new AppCookieManager().getAppCookie(args[0], false);
+  }
+
+  public AppCookieManager() {
+  }
+
+  /**
+   * Returns hadoop.auth cookie, doing needed SPNego authentication
+   *
+   * @param endpoint
+   *          the URL of the Hadoop service
+   * @param refresh
+   *          flag indicating wehther to refresh the cookie, if
+   *          <code>true</code>, we do a new SPNego authentication and refresh
+   *          the cookie even if the cookie already exists in local cache
+   * @return hadoop.auth cookie value
+   * @throws IOException
+   *           in case of problem getting hadoop.auth cookie
+   */
+  public String getAppCookie(String endpoint, boolean refresh)
+      throws IOException {
+
+    HttpUriRequest outboundRequest = new HttpGet(endpoint);
+    URI uri = outboundRequest.getURI();
+    String scheme = uri.getScheme();
+    String host = uri.getHost();
+    int port = uri.getPort();
+    String path = uri.getPath();
+    if (!refresh) {
+      String appCookie = endpointCookieMap.get(endpoint);
+      if (appCookie != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("got cached cookie");
+        }
+        return appCookie;
+      }
+    }
+
+    clearAppCookie(endpoint);
+
+    DefaultHttpClient client = new DefaultHttpClient();
+    SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(/* stripPort */true);
+    client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
+    client.getCredentialsProvider().setCredentials(
+        new AuthScope(/* host */null, /* port */-1, /* realm */null),
+        EMPTY_JAAS_CREDENTIALS);
+
+    String hadoopAuthCookie = null;
+    HttpResponse httpResponse = null;
+    try {
+      HttpHost httpHost = new HttpHost(host, port, scheme);
+      HttpRequest httpRequest = new HttpOptions(path);
+      httpResponse = client.execute(httpHost, httpRequest);
+      Header[] headers = httpResponse.getHeaders(SET_COOKIE);
+      if (LOG.isDebugEnabled()) {
+        for (Header header : headers) {
+          LOG.debug(header.getName() + " : " + header.getValue());
+        }
+      }
+      hadoopAuthCookie = getHadoopAuthCookieValue(headers);
+      if (hadoopAuthCookie == null) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        HttpEntity entity = httpResponse.getEntity();
+        String responseBody = entity != null ? EntityUtils.toString(entity) : null;
+        LOG.error("SPNego authentication failed with statusCode = " + statusCode + ", responseBody = " + responseBody + ", can not get hadoop.auth cookie for URL: " + endpoint);
+        return null;
+      }
+    } finally {
+      if (httpResponse != null) {
+        HttpEntity entity = httpResponse.getEntity();
+        if (entity != null) {
+          entity.getContent().close();
+        }
+      }
+
+    }
+
+    hadoopAuthCookie = HADOOP_AUTH_EQ + quote(hadoopAuthCookie);
+    setAppCookie(endpoint, hadoopAuthCookie);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Successful SPNego authentication to URL:" + uri.toString());
+    }
+    return hadoopAuthCookie;
+  }
+
+
+  /**
+   * Returns the cached app cookie
+   *  @param endpoint the hadoop end point we authenticate to
+   * @return the cached app cookie, can be null
+   */
+  public String getCachedAppCookie(String endpoint) {
+    return endpointCookieMap.get(endpoint);
+  }
+
+  /**
+   *  Sets the cached app cookie cache
+   *  @param endpoint the hadoop end point we authenticate to
+   *  @param appCookie the app cookie
+   */
+  private void setAppCookie(String endpoint, String appCookie) {
+    endpointCookieMap.put(endpoint, appCookie);
+  }
+
+  /**
+   *  Clears the cached app cookie
+   *  @param endpoint the hadoop end point we authenticate to
+   */
+  private void clearAppCookie(String endpoint) {
+    endpointCookieMap.remove(endpoint);
+  }
+
+  static String quote(String s) {
+    return s == null ? s : "\"" + s + "\"";
+  }
+
+  static String getHadoopAuthCookieValue(Header[] headers) {
+    if (headers == null) {
+      return null;
+    }
+    for (Header header : headers) {
+      HeaderElement[] elements = header.getElements();
+      for (HeaderElement element : elements) {
+        String cookieName = element.getName();
+        if (cookieName.equals(HADOOP_AUTH)) {
+          if (element.getValue() != null) {
+            String trimmedVal = element.getValue().trim();
+            if (!trimmedVal.isEmpty()) {
+              return trimmedVal;
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+
+  private static class EmptyJaasCredentials implements Credentials {
+
+    public String getPassword() {
+      return null;
+    }
+
+    public Principal getUserPrincipal() {
+      return null;
+    }
+
+  }
+
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
new file mode 100644
index 0000000..8355288
--- /dev/null
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.junit.Test;
+
+public class AppCookieManagerTest {
+
+  @Test
+  public void getCachedAppCookie() {
+    assertNull(new AppCookieManager().getCachedAppCookie("http://dummy"));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWithNullHeaders() {
+    assertNull(AppCookieManager.getHadoopAuthCookieValue(null));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWitEmptylHeaders() {
+    assertNull(AppCookieManager.getHadoopAuthCookieValue(new Header[0]));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWithValidlHeaders() {
+    Header[] headers = new Header[1];
+    headers[0] = new BasicHeader("Set-Cookie", AppCookieManager.HADOOP_AUTH + "=dummyvalue");
+    assertNotNull(AppCookieManager.getHadoopAuthCookieValue(headers));
+  }
+
+}