blob: a0204bef47c279c3c8ffe65ee6818d4b38942c39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SpnegoToken;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
/**
* Helper class the has constants and helper methods
* used in WASB when integrating with a remote http cred
* service which uses Kerberos and delegation tokens.
* Currently, remote service will be used to generate
* SAS keys, authorization and delegation token operations.
*/
public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
public static final Logger LOG =
LoggerFactory.getLogger(SecureWasbRemoteCallHelper.class);
/**
* Delegation token query parameter to be used when making rest call.
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME = "delegation";
/**
* Delegation token to be used for making the remote call.
*/
private Token<?> delegationToken = null;
/**
* Does Remote Http Call requires Kerberos Authentication always, even if the delegation token is present.
*/
private boolean alwaysRequiresKerberosAuth;
/**
* Enable caching of Spnego token.
*/
private boolean isSpnegoTokenCachingEnabled;
/**
* Cached SPNEGO token.
*/
private SpnegoToken spnegoToken;
public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
boolean alwaysRequiresKerberosAuth, boolean isSpnegoTokenCachingEnabled) {
super(retryPolicy);
this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
this.isSpnegoTokenCachingEnabled = isSpnegoTokenCachingEnabled;
}
@Override
public String makeRemoteRequest(final String[] urls,
final String path, final List<NameValuePair> queryParams,
final String httpMethod) throws IOException {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi != null) {
queryParams.add(new NameValuePair() {
@Override public String getName() {
return Constants.DOAS_PARAM;
}
@Override public String getValue() {
return ugi.getShortUserName();
}
});
} else {
connectUgi = ugi;
}
final Token delegationToken = getDelegationToken(ugi);
if (!alwaysRequiresKerberosAuth && delegationToken != null) {
final String delegationTokenEncodedUrlString =
delegationToken.encodeToUrlString();
queryParams.add(new NameValuePair() {
@Override public String getName() {
return DELEGATION_TOKEN_QUERY_PARAM_NAME;
}
@Override public String getValue() {
return delegationTokenEncodedUrlString;
}
});
}
if (delegationToken == null) {
connectUgi.checkTGTAndReloginFromKeytab();
}
String s = null;
try {
s = connectUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override public String run() throws Exception {
return retryableRequest(urls, path, queryParams, httpMethod);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
return s;
}
@Override
public HttpUriRequest getHttpRequest(String[] urls, String path,
List<NameValuePair> queryParams, int urlIndex, String httpMethod,
boolean requiresNewAuth) throws URISyntaxException, IOException {
URIBuilder uriBuilder =
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
if (uriBuilder.getHost().equals("localhost")) {
uriBuilder.setHost(InetAddress.getLocalHost().getCanonicalHostName());
}
HttpUriRequest httpUriRequest = null;
switch (httpMethod) {
case HttpPut.METHOD_NAME:
httpUriRequest = new HttpPut(uriBuilder.build());
break;
case HttpPost.METHOD_NAME:
httpUriRequest = new HttpPost(uriBuilder.build());
break;
default:
httpUriRequest = new HttpGet(uriBuilder.build());
break;
}
LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
uriBuilder.build().toURL());
if (alwaysRequiresKerberosAuth || delegationToken == null) {
AuthenticatedURL.Token token = null;
final Authenticator kerberosAuthenticator =
new KerberosDelegationTokenAuthenticator();
try {
if (isSpnegoTokenCachingEnabled && !requiresNewAuth
&& spnegoToken != null && spnegoToken.isTokenValid()){
token = spnegoToken.getToken();
} else {
token = new AuthenticatedURL.Token();
kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
spnegoToken = new SpnegoToken(token);
}
} catch (AuthenticationException e) {
throw new WasbRemoteCallException(
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
}
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. The request cannot proceed.");
httpUriRequest.setHeader("Cookie",
AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
return httpUriRequest;
}
private Token<?> getDelegationToken(
UserGroupInformation userGroupInformation) throws IOException {
if (this.delegationToken == null) {
Token<?> token = null;
for (Token iterToken : userGroupInformation.getTokens()) {
if (iterToken.getKind()
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
token = iterToken;
LOG.debug("{} token found in cache : {}",
WasbDelegationTokenIdentifier.TOKEN_KIND, iterToken);
break;
}
}
LOG.debug("UGI Information: {}", userGroupInformation.toString());
// ugi tokens are usually indicative of a task which can't
// refetch tokens. even if ugi has credentials, don't attempt
// to get another token to match hdfs/rpc behavior
if (token != null) {
LOG.debug("Using UGI token: {}", token);
setDelegationToken(token);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Delegation token from cache - {}", delegationToken != null
? delegationToken.encodeToUrlString()
: "null");
}
return this.delegationToken;
}
private <T extends TokenIdentifier> void setDelegationToken(
final Token<T> token) {
synchronized (this) {
this.delegationToken = token;
}
}
}