AMBARI-19369. Add Kerberos HTTP SPNEGO authentication support to Hadoop/hbase/kafka/storm sinks (Qin Liu via rlevas)
diff --git a/ambari-metrics-common/pom.xml b/ambari-metrics-common/pom.xml
index 62ae75f..f0d3963 100644
--- a/ambari-metrics-common/pom.xml
+++ b/ambari-metrics-common/pom.xml
@@ -189,5 +189,10 @@
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.5</version>
+ </dependency>
</dependencies>
</project>
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index a8dc571..fddf4b3 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
+import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -83,6 +84,9 @@
public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
public static final String INSTANCE_ID_PROPERTY = "instanceId";
public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
+ public static final String COOKIE = "Cookie";
+ private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+ private static final String NEGOTIATE = "Negotiate";
protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
@@ -97,6 +101,7 @@
private long lastFailedZkRequestTime = 0l;
private SSLSocketFactory sslSocketFactory;
+ private AppCookieManager appCookieManager = null;
protected final Log LOG;
@@ -157,6 +162,18 @@
connection = connectUrl.startsWith("https") ?
getSSLConnection(connectUrl) : getConnection(connectUrl);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitMetricsJson to " + connectUrl + ", " + jsonData);
+ }
+ AppCookieManager appCookieManager = getAppCookieManager();
+ String appCookie = appCookieManager.getCachedAppCookie(connectUrl);
+ if (appCookie != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Using cached app cookie for URL:" + connectUrl);
+ }
+ connection.setRequestProperty(COOKIE, appCookie);
+ }
+
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Connection", "Keep-Alive");
@@ -171,6 +188,37 @@
}
int statusCode = connection.getResponseCode();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitMetricsJson: statusCode = " + statusCode);
+ }
+
+ if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
+ String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + connectUrl);
+ }
+ if (wwwAuthHeader != null && wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
+ appCookie = appCookieManager.getAppCookie(connectUrl, true);
+ if (appCookie != null) {
+ connection.setRequestProperty(COOKIE, appCookie);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ statusCode = connection.getResponseCode();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitMetricsJson: statusCode2 = " + statusCode);
+ }
+ }
+ } else {
+ // no supported authentication type found
+ // we would let the original response propagate
+ LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + connectUrl);
+ }
+ }
if (statusCode != 200) {
LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
@@ -265,6 +313,18 @@
}
/**
+ * Get the associated app cookie manager.
+ *
+ * @return the app cookie manager
+ */
+ public synchronized AppCookieManager getAppCookieManager() {
+ if (appCookieManager == null) {
+ appCookieManager = new AppCookieManager();
+ }
+ return appCookieManager;
+ }
+
+ /**
* Cleans up and closes an input stream
* see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
* @param is the InputStream to clean up
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
new file mode 100644
index 0000000..bcba238
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Handles SPNego authentication as a client of hadoop service, caches
+ * hadoop.auth cookie returned by hadoop service on successful SPNego
+ * authentication. Refreshes hadoop.auth cookie on demand if the cookie has
+ * expired.
+ *
+ */
+public class AppCookieManager {
+
+ static final String HADOOP_AUTH = "hadoop.auth";
+ private static final String HADOOP_AUTH_EQ = "hadoop.auth=";
+ private static final String SET_COOKIE = "Set-Cookie";
+
+ private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+
+ private Map<String, String> endpointCookieMap = new ConcurrentHashMap<String, String>();
+ private static Log LOG = LogFactory.getLog(AppCookieManager.class);
+
+ /**
+ * Utility method to exercise AppCookieManager directly
+ * @param args element 0 of args should be a URL to hadoop service protected by SPengo
+ * @throws IOException in case of errors
+ */
+ public static void main(String[] args) throws IOException {
+ new AppCookieManager().getAppCookie(args[0], false);
+ }
+
+ public AppCookieManager() {
+ }
+
+ /**
+ * Returns hadoop.auth cookie, doing needed SPNego authentication
+ *
+ * @param endpoint
+ * the URL of the Hadoop service
+ * @param refresh
+ * flag indicating wehther to refresh the cookie, if
+ * <code>true</code>, we do a new SPNego authentication and refresh
+ * the cookie even if the cookie already exists in local cache
+ * @return hadoop.auth cookie value
+ * @throws IOException
+ * in case of problem getting hadoop.auth cookie
+ */
+ public String getAppCookie(String endpoint, boolean refresh)
+ throws IOException {
+
+ HttpUriRequest outboundRequest = new HttpGet(endpoint);
+ URI uri = outboundRequest.getURI();
+ String scheme = uri.getScheme();
+ String host = uri.getHost();
+ int port = uri.getPort();
+ String path = uri.getPath();
+ if (!refresh) {
+ String appCookie = endpointCookieMap.get(endpoint);
+ if (appCookie != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("got cached cookie");
+ }
+ return appCookie;
+ }
+ }
+
+ clearAppCookie(endpoint);
+
+ DefaultHttpClient client = new DefaultHttpClient();
+ SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(/* stripPort */true);
+ client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
+ client.getCredentialsProvider().setCredentials(
+ new AuthScope(/* host */null, /* port */-1, /* realm */null),
+ EMPTY_JAAS_CREDENTIALS);
+
+ String hadoopAuthCookie = null;
+ HttpResponse httpResponse = null;
+ try {
+ HttpHost httpHost = new HttpHost(host, port, scheme);
+ HttpRequest httpRequest = new HttpOptions(path);
+ httpResponse = client.execute(httpHost, httpRequest);
+ Header[] headers = httpResponse.getHeaders(SET_COOKIE);
+ if (LOG.isDebugEnabled()) {
+ for (Header header : headers) {
+ LOG.debug(header.getName() + " : " + header.getValue());
+ }
+ }
+ hadoopAuthCookie = getHadoopAuthCookieValue(headers);
+ if (hadoopAuthCookie == null) {
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+ HttpEntity entity = httpResponse.getEntity();
+ String responseBody = entity != null ? EntityUtils.toString(entity) : null;
+ LOG.error("SPNego authentication failed with statusCode = " + statusCode + ", responseBody = " + responseBody + ", can not get hadoop.auth cookie for URL: " + endpoint);
+ return null;
+ }
+ } finally {
+ if (httpResponse != null) {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity != null) {
+ entity.getContent().close();
+ }
+ }
+
+ }
+
+ hadoopAuthCookie = HADOOP_AUTH_EQ + quote(hadoopAuthCookie);
+ setAppCookie(endpoint, hadoopAuthCookie);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Successful SPNego authentication to URL:" + uri.toString());
+ }
+ return hadoopAuthCookie;
+ }
+
+
+ /**
+ * Returns the cached app cookie
+ * @param endpoint the hadoop end point we authenticate to
+ * @return the cached app cookie, can be null
+ */
+ public String getCachedAppCookie(String endpoint) {
+ return endpointCookieMap.get(endpoint);
+ }
+
+ /**
+ * Sets the cached app cookie cache
+ * @param endpoint the hadoop end point we authenticate to
+ * @param appCookie the app cookie
+ */
+ private void setAppCookie(String endpoint, String appCookie) {
+ endpointCookieMap.put(endpoint, appCookie);
+ }
+
+ /**
+ * Clears the cached app cookie
+ * @param endpoint the hadoop end point we authenticate to
+ */
+ private void clearAppCookie(String endpoint) {
+ endpointCookieMap.remove(endpoint);
+ }
+
+ static String quote(String s) {
+ return s == null ? s : "\"" + s + "\"";
+ }
+
+ static String getHadoopAuthCookieValue(Header[] headers) {
+ if (headers == null) {
+ return null;
+ }
+ for (Header header : headers) {
+ HeaderElement[] elements = header.getElements();
+ for (HeaderElement element : elements) {
+ String cookieName = element.getName();
+ if (cookieName.equals(HADOOP_AUTH)) {
+ if (element.getValue() != null) {
+ String trimmedVal = element.getValue().trim();
+ if (!trimmedVal.isEmpty()) {
+ return trimmedVal;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+
+ private static class EmptyJaasCredentials implements Credentials {
+
+ public String getPassword() {
+ return null;
+ }
+
+ public Principal getUserPrincipal() {
+ return null;
+ }
+
+ }
+
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
new file mode 100644
index 0000000..8355288
--- /dev/null
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.junit.Test;
+
+public class AppCookieManagerTest {
+
+ @Test
+ public void getCachedAppCookie() {
+ assertNull(new AppCookieManager().getCachedAppCookie("http://dummy"));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWithNullHeaders() {
+ assertNull(AppCookieManager.getHadoopAuthCookieValue(null));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWitEmptylHeaders() {
+ assertNull(AppCookieManager.getHadoopAuthCookieValue(new Header[0]));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWithValidlHeaders() {
+ Header[] headers = new Header[1];
+ headers[0] = new BasicHeader("Set-Cookie", AppCookieManager.HADOOP_AUTH + "=dummyvalue");
+ assertNotNull(AppCookieManager.getHadoopAuthCookieValue(headers));
+ }
+
+}