HADOOP-13381. KMS clients should use KMS Delegation Tokens from current UGI. Contributed by Xiao Chen.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 7e06ddd..47549f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -38,6 +38,7 @@
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
@@ -536,8 +537,12 @@
           UserGroupInformation.AuthenticationMethod.PROXY)
                               ? currentUgi.getShortUserName() : null;
 
-      // creating the HTTP connection using the current UGI at constructor time
-      conn = actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
+      // If current UGI contains kms-dt && is not proxy, doAs it to use its dt.
+      // Otherwise, create the HTTP connection using the UGI at constructor time
+      UserGroupInformation ugiToUse =
+          (currentUgiContainsKmsDt() && doAsUser == null) ?
+              currentUgi : actualUgi;
+      conn = ugiToUse.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
         @Override
         public HttpURLConnection run() throws Exception {
           DelegationTokenAuthenticatedURL authUrl =
@@ -1041,6 +1046,20 @@
     return dtService;
   }
 
+  private boolean currentUgiContainsKmsDt() throws IOException {
+    // Add existing credentials from current UGI, since provider is cached.
+    Credentials creds = UserGroupInformation.getCurrentUser().
+        getCredentials();
+    if (!creds.getAllTokens().isEmpty()) {
+      org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+          dToken = creds.getToken(getDelegationTokenService());
+      if (dToken != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Shutdown valueQueue executor threads
    */
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index e3b30a12..61b9a90 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -38,7 +38,9 @@
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +52,8 @@
 
 import javax.security.auth.login.AppConfigurationEntry;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -62,8 +66,10 @@
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -1877,6 +1883,125 @@
   }
 
   @Test
+  public void testDelegationTokensUpdatedInUGI() throws Exception {
+    Configuration conf = new Configuration();
+    UserGroupInformation.setConfiguration(conf);
+    File confDir = getTestDir();
+    conf = createBaseKMSConf(confDir);
+    conf.set(
+        "hadoop.kms.authentication.delegation-token.max-lifetime.sec", "5");
+    conf.set(
+        "hadoop.kms.authentication.delegation-token.renew-interval.sec", "5");
+    writeConf(confDir, conf);
+
+    // Running as a service (e.g. Yarn in practice).
+    runServer(null, null, confDir, new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration clientConf = new Configuration();
+        final URI uri = createKMSUri(getKMSUrl());
+        clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+            createKMSUri(getKMSUrl()).toString());
+        final KeyProvider kp = createProvider(uri, clientConf);
+        final KeyProviderDelegationTokenExtension kpdte =
+            KeyProviderDelegationTokenExtension.
+                createKeyProviderDelegationTokenExtension(kp);
+        final InetSocketAddress kmsAddr =
+            new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
+
+        // Job 1 (e.g. Yarn log aggregation job), with user DT.
+        final Collection<Token<?>> job1Token = new HashSet<>();
+        doAs("client", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            // Get a DT and use it.
+            final Credentials credentials = new Credentials();
+            kpdte.addDelegationTokens("client", credentials);
+            Assert.assertEquals(1, credentials.getAllTokens().size());
+            Assert.assertEquals(KMSClientProvider.TOKEN_KIND, credentials.
+                getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+            UserGroupInformation.getCurrentUser().addCredentials(credentials);
+            LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
+                getCurrentUser().getCredentials().getAllTokens());
+            Token<?> token =
+                UserGroupInformation.getCurrentUser().getCredentials()
+                    .getToken(SecurityUtil.buildTokenService(kmsAddr));
+            Assert.assertNotNull(token);
+            job1Token.add(token);
+
+            // Decode the token to get max time.
+            ByteArrayInputStream buf =
+                new ByteArrayInputStream(token.getIdentifier());
+            DataInputStream dis = new DataInputStream(buf);
+            DelegationTokenIdentifier id =
+                new DelegationTokenIdentifier(token.getKind());
+            id.readFields(dis);
+            dis.close();
+            final long maxTime = id.getMaxDate();
+
+            // wait for token to expire.
+            Thread.sleep(5100);
+            Assert.assertTrue("maxTime " + maxTime + " is not less than now.",
+                maxTime > 0 && maxTime < Time.now());
+            try {
+              kp.getKeys();
+              Assert.fail("Operation should fail since dt is expired.");
+            } catch (Exception e) {
+              LOG.info("Expected error.", e);
+            }
+            return null;
+          }
+        });
+        Assert.assertFalse(job1Token.isEmpty());
+
+        // job 2 (e.g. Another Yarn log aggregation job, with user DT.
+        doAs("client", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            // Get a new DT, but don't use it yet.
+            final Credentials newCreds = new Credentials();
+            kpdte.addDelegationTokens("client", newCreds);
+            Assert.assertEquals(1, newCreds.getAllTokens().size());
+            Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
+                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+                    getKind());
+
+            // Using job 1's DT should fail.
+            final Credentials oldCreds = new Credentials();
+            for (Token<?> token : job1Token) {
+              if (token.getKind().equals(KMSClientProvider.TOKEN_KIND)) {
+                oldCreds
+                    .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+              }
+            }
+            UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
+            LOG.info("Added old kms dt to credentials: {}", UserGroupInformation
+                .getCurrentUser().getCredentials().getAllTokens());
+            try {
+              kp.getKeys();
+              Assert.fail("Operation should fail since dt is expired.");
+            } catch (Exception e) {
+              LOG.info("Expected error.", e);
+            }
+
+            // Using the new DT should succeed.
+            Assert.assertEquals(1, newCreds.getAllTokens().size());
+            Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
+                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+                    getKind());
+            UserGroupInformation.getCurrentUser().addCredentials(newCreds);
+            LOG.info("Credetials now are: {}", UserGroupInformation
+                .getCurrentUser().getCredentials().getAllTokens());
+            kp.getKeys();
+            return null;
+          }
+        });
+        return null;
+      }
+    });
+  }
+
+  @Test
   public void testKMSWithZKSigner() throws Exception {
     doKMSWithZK(true, false);
   }