HADOOP-13381. KMS clients should use KMS Delegation Tokens from current UGI. Contributed by Xiao Chen.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 7e06ddd..47549f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
@@ -536,8 +537,12 @@
UserGroupInformation.AuthenticationMethod.PROXY)
? currentUgi.getShortUserName() : null;
- // creating the HTTP connection using the current UGI at constructor time
- conn = actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
+ // If current UGI contains kms-dt && is not proxy, doAs it to use its dt.
+ // Otherwise, create the HTTP connection using the UGI at constructor time
+ UserGroupInformation ugiToUse =
+ (currentUgiContainsKmsDt() && doAsUser == null) ?
+ currentUgi : actualUgi;
+ conn = ugiToUse.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
@Override
public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
@@ -1041,6 +1046,20 @@
return dtService;
}
+ private boolean currentUgiContainsKmsDt() throws IOException {
+ // Add existing credentials from current UGI, since provider is cached.
+ Credentials creds = UserGroupInformation.getCurrentUser().
+ getCredentials();
+ if (!creds.getAllTokens().isEmpty()) {
+ org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+ dToken = creds.getToken(getDelegationTokenService());
+ if (dToken != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Shutdown valueQueue executor threads
*/
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index e3b30a12..61b9a90 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -38,7 +38,9 @@
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -50,6 +52,8 @@
import javax.security.auth.login.AppConfigurationEntry;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -62,8 +66,10 @@
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -1877,6 +1883,125 @@
}
@Test
+ public void testDelegationTokensUpdatedInUGI() throws Exception {
+ Configuration conf = new Configuration();
+ UserGroupInformation.setConfiguration(conf);
+ File confDir = getTestDir();
+ conf = createBaseKMSConf(confDir);
+ conf.set(
+ "hadoop.kms.authentication.delegation-token.max-lifetime.sec", "5");
+ conf.set(
+ "hadoop.kms.authentication.delegation-token.renew-interval.sec", "5");
+ writeConf(confDir, conf);
+
+ // Running as a service (e.g. Yarn in practice).
+ runServer(null, null, confDir, new KMSCallable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final Configuration clientConf = new Configuration();
+ final URI uri = createKMSUri(getKMSUrl());
+ clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+ createKMSUri(getKMSUrl()).toString());
+ final KeyProvider kp = createProvider(uri, clientConf);
+ final KeyProviderDelegationTokenExtension kpdte =
+ KeyProviderDelegationTokenExtension.
+ createKeyProviderDelegationTokenExtension(kp);
+ final InetSocketAddress kmsAddr =
+ new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
+
+ // Job 1 (e.g. Yarn log aggregation job), with user DT.
+ final Collection<Token<?>> job1Token = new HashSet<>();
+ doAs("client", new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Get a DT and use it.
+ final Credentials credentials = new Credentials();
+ kpdte.addDelegationTokens("client", credentials);
+ Assert.assertEquals(1, credentials.getAllTokens().size());
+ Assert.assertEquals(KMSClientProvider.TOKEN_KIND, credentials.
+ getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
+ getCurrentUser().getCredentials().getAllTokens());
+ Token<?> token =
+ UserGroupInformation.getCurrentUser().getCredentials()
+ .getToken(SecurityUtil.buildTokenService(kmsAddr));
+ Assert.assertNotNull(token);
+ job1Token.add(token);
+
+ // Decode the token to get max time.
+ ByteArrayInputStream buf =
+ new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream dis = new DataInputStream(buf);
+ DelegationTokenIdentifier id =
+ new DelegationTokenIdentifier(token.getKind());
+ id.readFields(dis);
+ dis.close();
+ final long maxTime = id.getMaxDate();
+
+ // wait for token to expire.
+ Thread.sleep(5100);
+ Assert.assertTrue("maxTime " + maxTime + " is not less than now.",
+ maxTime > 0 && maxTime < Time.now());
+ try {
+ kp.getKeys();
+ Assert.fail("Operation should fail since dt is expired.");
+ } catch (Exception e) {
+ LOG.info("Expected error.", e);
+ }
+ return null;
+ }
+ });
+ Assert.assertFalse(job1Token.isEmpty());
+
+ // job 2 (e.g. Another Yarn log aggregation job, with user DT.
+ doAs("client", new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Get a new DT, but don't use it yet.
+ final Credentials newCreds = new Credentials();
+ kpdte.addDelegationTokens("client", newCreds);
+ Assert.assertEquals(1, newCreds.getAllTokens().size());
+ Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
+ newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+ getKind());
+
+ // Using job 1's DT should fail.
+ final Credentials oldCreds = new Credentials();
+ for (Token<?> token : job1Token) {
+ if (token.getKind().equals(KMSClientProvider.TOKEN_KIND)) {
+ oldCreds
+ .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+ }
+ }
+ UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
+ LOG.info("Added old kms dt to credentials: {}", UserGroupInformation
+ .getCurrentUser().getCredentials().getAllTokens());
+ try {
+ kp.getKeys();
+ Assert.fail("Operation should fail since dt is expired.");
+ } catch (Exception e) {
+ LOG.info("Expected error.", e);
+ }
+
+ // Using the new DT should succeed.
+ Assert.assertEquals(1, newCreds.getAllTokens().size());
+ Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
+ newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+ getKind());
+ UserGroupInformation.getCurrentUser().addCredentials(newCreds);
+ LOG.info("Credetials now are: {}", UserGroupInformation
+ .getCurrentUser().getCredentials().getAllTokens());
+ kp.getKeys();
+ return null;
+ }
+ });
+ return null;
+ }
+ });
+ }
+
+ @Test
public void testKMSWithZKSigner() throws Exception {
doKMSWithZK(true, false);
}