YARN-6539. Create SecureLogin inside Router. (#4712)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d42562c..c1fa8c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4107,6 +4107,16 @@
   public static final long DEFAULT_ROUTER_WEBAPP_READ_TIMEOUT =
       TimeUnit.SECONDS.toMillis(30);
 
+  /** The Kerberos keytab for the yarn router.*/
+  public static final String ROUTER_KEYTAB = ROUTER_PREFIX + "keytab.file";
+
+  /** The Kerberos principal for the yarn router.*/
+  public static final String ROUTER_PRINCIPAL = ROUTER_PREFIX + "kerberos.principal";
+
+  /** The Kerberos principal hostname for the yarn router.*/
+  public static final String ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY = ROUTER_PREFIX +
+      "kerberos.principal.hostname";
+
   ////////////////////////////////
   // CSI Volume configs
   ////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 407ef74..68d8ed9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4888,4 +4888,37 @@
       default implementation LocalityAppPlacementAllocator is used.
     </description>
   </property>
+
+  <property>
+    <name>yarn.router.keytab.file</name>
+    <value></value>
+    <description>
+       The keytab file used by router to login as its
+       service principal. The principal name is configured with
+       dfs.federation.router.kerberos.principal.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.router.kerberos.principal</name>
+    <value></value>
+    <description>
+      The Router service principal. This is typically set to
+      router/_HOST@REALM.TLD. Each Router will substitute _HOST with its
+      own fully qualified hostname at startup. The _HOST placeholder
+      allows using the same configuration setting on both Router setup.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.router.kerberos.principal.hostname</name>
+    <value></value>
+    <description>
+      Optional.
+      The hostname for the Router containing this
+      configuration file. Will be different for each machine.
+      Defaults to current hostname.
+    </description>
+  </property>
+
 </configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java
index fec967d..7eeb44b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java
@@ -43,6 +43,14 @@
     return id;
   }
 
+  @Private
+  @Unstable
+  public static SubClusterId newInstance(Integer subClusterId) {
+    SubClusterId id = Records.newRecord(SubClusterId.class);
+    id.setId(String.valueOf(subClusterId));
+    return id;
+  }
+
   /**
    * Get the string identifier of the <em>subcluster</em> which is unique across
    * the federated cluster. The identifier is static, i.e. preserved across
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index fa3c8b8..2882776 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -609,4 +609,10 @@
   protected interface Func<T, TResult> {
     TResult invoke(T input) throws Exception;
   }
+
+
+  @VisibleForTesting
+  public FederationStateStore getStateStore() {
+    return stateStore;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 61ea864..c70a2db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -382,8 +382,13 @@
   protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
       ApplicationId appId) throws IOException, YarnException {
     try {
-      UserGroupInformation appSubmitter =
-          UserGroupInformation.createRemoteUser(this.submitter);
+      UserGroupInformation appSubmitter;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        appSubmitter = UserGroupInformation.createProxyUser(this.submitter,
+            UserGroupInformation.getLoginUser());
+      } else {
+        appSubmitter = UserGroupInformation.createRemoteUser(this.submitter);
+      }
       this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
           appSubmitter, null);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index f46a31d..13c062a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -459,8 +459,13 @@
       // Get the running containers from home RM, note that we will also get the
       // AM container itself from here. We don't need it, but no harm to put the
       // map as well.
-      UserGroupInformation appSubmitter = UserGroupInformation
-          .createRemoteUser(getApplicationContext().getUser());
+      UserGroupInformation appSubmitter;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        appSubmitter = UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+            UserGroupInformation.getLoginUser());
+      } else {
+        appSubmitter = UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+      }
       ApplicationClientProtocol rmClient =
           createHomeRMProxy(getApplicationContext(),
               ApplicationClientProtocol.class, appSubmitter);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
index d299290..aa80880 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
@@ -116,6 +116,19 @@
       <artifactId>guice</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index fc0b9ee..e95b2567 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.yarn.server.router;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -88,7 +91,8 @@
   }
 
   protected void doSecureLogin() throws IOException {
-    // TODO YARN-6539 Create SecureLogin inside Router
+    SecurityUtil.login(this.conf, YarnConfiguration.ROUTER_KEYTAB,
+        YarnConfiguration.ROUTER_PRINCIPAL, getHostName(this.conf));
   }
 
   @Override
@@ -195,4 +199,31 @@
       System.exit(-1);
     }
   }
+
+  @VisibleForTesting
+  public RouterClientRMService getClientRMProxyService() {
+    return clientRMProxyService;
+  }
+
+  @VisibleForTesting
+  public RouterRMAdminService getRmAdminProxyService() {
+    return rmAdminProxyService;
+  }
+
+  /**
+   * Returns the hostname for this Router. If the hostname is not
+   * explicitly configured in the given config, then it is determined.
+   *
+   * @param config configuration
+   * @return the hostname (NB: may not be a FQDN)
+   * @throws UnknownHostException if the hostname cannot be determined
+   */
+  private String getHostName(Configuration config)
+      throws UnknownHostException {
+    String name = config.get(YarnConfiguration.ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY);
+    if (name == null) {
+      name = InetAddress.getLocalHost().getHostName();
+    }
+    return name;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
index f8dc4f5..961026d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -106,8 +106,9 @@
     try {
       // Do not create a proxy user if user name matches the user name on
       // current UGI
-      if (userName.equalsIgnoreCase(
-          UserGroupInformation.getCurrentUser().getUserName())) {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
+      } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
         user = UserGroupInformation.getCurrentUser();
       } else {
         user = UserGroupInformation.createProxyUser(userName,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 947e5f0..9f4c31d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -1623,4 +1623,14 @@
         String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
     throw new YarnException(errorMsg);
   }
+
+  @VisibleForTesting
+  public FederationStateStoreFacade getFederationFacade() {
+    return federationFacade;
+  }
+
+  @VisibleForTesting
+  public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
+    return clientRMProxies;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
index fad6866..b60a267 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -444,7 +444,7 @@
   }
 
   @VisibleForTesting
-  protected RequestInterceptorChainWrapper getInterceptorChain()
+  public RequestInterceptorChainWrapper getInterceptorChain()
       throws IOException {
     String user = UserGroupInformation.getCurrentUser().getUserName();
     RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
@@ -553,4 +553,9 @@
       rootInterceptor.shutdown();
     }
   }
+
+  @VisibleForTesting
+  public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
+    return userPipelineMap;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
index 9625eb4..122782a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
@@ -80,8 +80,9 @@
     try {
       // Do not create a proxy user if user name matches the user name on
       // current UGI
-      if (userName.equalsIgnoreCase(
-          UserGroupInformation.getCurrentUser().getUserName())) {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
+      } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
         user = UserGroupInformation.getCurrentUser();
       } else {
         user = UserGroupInformation.createProxyUser(userName,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
index 56378d4..8024cdb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
@@ -157,7 +157,7 @@
   }
 
   @VisibleForTesting
-  protected RequestInterceptorChainWrapper getInterceptorChain()
+  public RequestInterceptorChainWrapper getInterceptorChain()
       throws IOException {
     String user = UserGroupInformation.getCurrentUser().getUserName();
     RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/AbstractSecureRouterTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/AbstractSecureRouterTest.java
new file mode 100644
index 0000000..61a5e89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/AbstractSecureRouterTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.secure;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+public abstract class AbstractSecureRouterTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractSecureRouterTest.class);
+
+  ////////////////////////////////
+  // Kerberos Constants
+  ////////////////////////////////
+
+  public static final String REALM = "EXAMPLE.COM";
+  public static final String ROUTER = "router";
+  public static final String LOCALHOST = "localhost";
+  public static final String IP127001 = "127.0.0.1";
+  public static final String ROUTER_LOCALHOST = "router/" + LOCALHOST;
+  public static final String ROUTER_127001 = "router/" + IP127001;
+  public static final String ROUTER_REALM = "router@" + REALM;
+  public static final String ROUTER_LOCALHOST_REALM = ROUTER_LOCALHOST + "@" + REALM;
+  public static final String SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug";
+  public static final String KERBEROS = "kerberos";
+
+  ////////////////////////////////
+  // BeforeSecureRouterTestClass Init
+  ////////////////////////////////
+
+  private static MiniKdc kdc;
+  private static File routerKeytab;
+  private static File kdcWorkDir;
+  private static Configuration conf;
+
+  ////////////////////////////////
+  // Specific Constant
+  // Like Mem, VCore, ClusterNum
+  ////////////////////////////////
+  private static final int NUM_SUBCLUSTER = 4;
+  private static final int GB = 1024;
+  private static final int NM_MEMORY = 8 * GB;
+  private static final int NM_VCORE = 4;
+
+  ////////////////////////////////
+  // Test use in subclasses
+  ////////////////////////////////
+
+  private Router router = null;
+
+  private static ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
+      new ConcurrentHashMap<>();
+
+  @BeforeClass
+  public static void beforeSecureRouterTestClass() throws Exception {
+    // Sets up the KDC and Principals.
+    setupKDCAndPrincipals();
+
+    // Init YarnConfiguration
+    conf = new YarnConfiguration();
+
+    // Enable Kerberos authentication configuration
+    conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
+
+    // Router configuration
+    conf.set(YarnConfiguration.ROUTER_BIND_HOST, "0.0.0.0");
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        FederationClientInterceptor.class.getName());
+
+    // Router Kerberos KeyTab configuration
+    conf.set(YarnConfiguration.ROUTER_PRINCIPAL, ROUTER_LOCALHOST_REALM);
+    conf.set(YarnConfiguration.ROUTER_KEYTAB, routerKeytab.getAbsolutePath());
+  }
+
+  /**
+   * Sets up the KDC and Principals.
+   *
+   * @throws Exception an error occurred.
+   */
+  public static void setupKDCAndPrincipals() throws Exception {
+    // set up the KDC
+    File target = new File(System.getProperty("test.dir", "target"));
+    kdcWorkDir = new File(target, "kdc");
+    kdcWorkDir.mkdirs();
+    if (!kdcWorkDir.mkdirs()) {
+      assertTrue(kdcWorkDir.isDirectory());
+    }
+    Properties kdcConf = MiniKdc.createConf();
+    kdcConf.setProperty(MiniKdc.DEBUG, "true");
+    kdc = new MiniKdc(kdcConf, kdcWorkDir);
+    kdc.start();
+    routerKeytab = createKeytab(ROUTER, "router.keytab");
+  }
+
+  /**
+   * Initialize RM in safe mode.
+   *
+   * @throws Exception an error occurred.
+   */
+  public static void setupSecureMockRM() throws Exception {
+    for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+      SubClusterId sc = SubClusterId.newInstance(i);
+      if (mockRMs.containsKey(sc)) {
+        continue;
+      }
+      MockRM mockRM = new TestRMRestart.TestSecurityMockRM(conf);
+      mockRM.start();
+      mockRM.registerNode("127.0.0.1:1234", NM_MEMORY, NM_VCORE);
+      mockRMs.put(sc, mockRM);
+    }
+  }
+
+  /**
+   * Create the keytab for the given principal, includes
+   * raw principal and $principal/localhost.
+   *
+   * @param principal principal short name.
+   * @param filename filename of keytab.
+   * @return file of keytab.
+   * @throws Exception an error occurred.
+   */
+  public static File createKeytab(String principal, String filename) throws Exception {
+    assertTrue("empty principal", StringUtils.isNotBlank(principal));
+    assertTrue("empty host", StringUtils.isNotBlank(filename));
+    assertNotNull("null KDC", kdc);
+    File keytab = new File(kdcWorkDir, filename);
+    kdc.createPrincipal(keytab,
+        principal,
+        principal + "/localhost",
+        principal + "/127.0.0.1");
+    return keytab;
+  }
+
+  /**
+   * Start the router in safe mode.
+   *
+   * @throws Exception an error occurred.
+   */
+  public synchronized void startSecureRouter() {
+    assertNull("Router is already running", router);
+    UserGroupInformation.setConfiguration(conf);
+    router = new Router();
+    router.init(conf);
+    router.start();
+  }
+
+  /**
+   * Shut down the KDC service.
+   *
+   * @throws Exception an error occurred.
+   */
+  public static void teardownKDC() throws Exception {
+    if (kdc != null) {
+      kdc.stop();
+      kdc = null;
+    }
+  }
+
+  /**
+   * Stop the router in safe mode.
+   *
+   * @throws Exception an error occurred.
+   */
+  protected synchronized void stopSecureRouter() throws Exception {
+    if (router != null) {
+      router.stop();
+      router = null;
+    }
+  }
+
+  /**
+   * Stop the entire test service.
+   *
+   * @throws Exception an error occurred.
+   */
+  @AfterClass
+  public static void afterSecureRouterTest() throws Exception {
+    LOG.info("teardown of kdc instance.");
+    teardownKDC();
+  }
+
+  public static MiniKdc getKdc() {
+    return kdc;
+  }
+
+  public Router getRouter() {
+    return router;
+  }
+
+  public static ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
+    return mockRMs;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/TestSecureLogins.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/TestSecureLogins.java
new file mode 100644
index 0000000..4091181
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/secure/TestSecureLogins.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.secure;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor;
+import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestSecureLogins extends AbstractSecureRouterTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSecureLogins.class);
+
+  @Test
+  public void testHasRealm() throws Throwable {
+    Assert.assertNotNull(getRealm());
+    LOG.info("Router principal = {}", getPrincipalAndRealm(ROUTER_LOCALHOST));
+  }
+
+  @Test
+  public void testRouterSecureLogin() throws Exception {
+    startSecureRouter();
+
+    List<Service> services = this.getRouter().getServices();
+    Assert.assertNotNull(services);
+    Assert.assertEquals(3, services.size());
+
+    stopSecureRouter();
+  }
+
+  @Test
+  public void testRouterClientRMService() throws Exception {
+    // Start the Router in Secure Mode
+    startSecureRouter();
+
+    // Start RM and RouterClientRMService in Secure mode
+    setupSecureMockRM();
+    initRouterClientRMService();
+
+    // Test the simple rpc call of the Router in the Secure environment
+    RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
+    GetClusterMetricsRequest metricsRequest = GetClusterMetricsRequest.newInstance();
+    GetClusterMetricsResponse metricsResponse =
+        routerClientRMService.getClusterMetrics(metricsRequest);
+    Assert.assertNotNull(metricsResponse);
+    YarnClusterMetrics clusterMetrics = metricsResponse.getClusterMetrics();
+    Assert.assertEquals(4, clusterMetrics.getNumNodeManagers());
+    Assert.assertEquals(0, clusterMetrics.getNumLostNodeManagers());
+
+    // Stop the Router in Secure Mode
+    stopSecureRouter();
+  }
+
+  @Test
+  public void testRouterRMAdminService() throws Exception {
+    // Start the Router in Secure Mode
+    startSecureRouter();
+
+    // Start RM and RouterClientRMService in Secure mode
+    setupSecureMockRM();
+    initRouterRMAdminService();
+
+    // Test the simple rpc call of the Router in the Secure environment
+    RouterRMAdminService routerRMAdminService = this.getRouter().getRmAdminProxyService();
+    RefreshNodesRequest refreshNodesRequest = RefreshNodesRequest.newInstance();
+    RefreshNodesResponse refreshNodesResponse =
+        routerRMAdminService.refreshNodes(refreshNodesRequest);
+    Assert.assertNotNull(refreshNodesResponse);
+
+    // Stop the Router in Secure Mode
+    stopSecureRouter();
+  }
+
+  public static String getPrincipalAndRealm(String principal) {
+    return principal + "@" + getRealm();
+  }
+
+  protected static String getRealm() {
+    return getKdc().getRealm();
+  }
+
+  private void initRouterClientRMService() throws Exception {
+    Router router = this.getRouter();
+    Map<SubClusterId, MockRM> mockRMs = getMockRMs();
+
+    RouterClientRMService rmService = router.getClientRMProxyService();
+    RouterClientRMService.RequestInterceptorChainWrapper wrapper = rmService.getInterceptorChain();
+    FederationClientInterceptor interceptor =
+        (FederationClientInterceptor) wrapper.getRootInterceptor();
+    FederationStateStoreFacade stateStoreFacade = interceptor.getFederationFacade();
+    FederationStateStore stateStore = stateStoreFacade.getStateStore();
+    FederationStateStoreTestUtil stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+    Map<SubClusterId, ApplicationClientProtocol> clientRMProxies = interceptor.getClientRMProxies();
+
+    if (MapUtils.isNotEmpty(mockRMs)) {
+      for (Map.Entry<SubClusterId, MockRM> entry : mockRMs.entrySet()) {
+        SubClusterId sc = entry.getKey();
+        MockRM mockRM = entry.getValue();
+        stateStoreUtil.registerSubCluster(sc);
+        if (clientRMProxies.containsKey(sc)) {
+          continue;
+        }
+        clientRMProxies.put(sc, mockRM.getClientRMService());
+      }
+    }
+  }
+
+  private void initRouterRMAdminService() throws Exception {
+    Router router = this.getRouter();
+    Map<SubClusterId, MockRM> mockRMs = getMockRMs();
+    SubClusterId sc = SubClusterId.newInstance(0);
+    MockRM mockRM = mockRMs.get(sc);
+    RouterRMAdminService routerRMAdminService = router.getRmAdminProxyService();
+    RouterRMAdminService.RequestInterceptorChainWrapper rmAdminChainWrapper =
+        routerRMAdminService.getInterceptorChain();
+    DefaultRMAdminRequestInterceptor rmInterceptor =
+        (DefaultRMAdminRequestInterceptor) rmAdminChainWrapper.getRootInterceptor();
+    rmInterceptor.setRMAdmin(mockRM.getAdminService());
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index c7836e7..31a0718 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -258,6 +258,16 @@
 |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. |
 |`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |
 
+Security:
+
+Kerberos supported in federation.
+
+| Property | Example | Description |
+|:---- |:---- |
+| `yarn.router.keytab.file` |  | The keytab file used by router to login as its service principal. The principal name is configured with 'yarn.router.kerberos.principal'.|
+| `yarn.router.kerberos.principal` | | The Router service principal. This is typically set to router/_HOST@REALM.TLD. Each Router will substitute _HOST with its own fully qualified hostname at startup. The _HOST placeholder allows using the same configuration setting on all Routers in setup. |
+| `yarn.router.kerberos.principal.hostname` |  | Optional. The hostname for the Router containing this configuration file.  Will be different for each machine. Defaults to current hostname. |
+
 ###ON NMs:
 
 These are extra configurations that should appear in the **conf/yarn-site.xml** at each NodeManager.