HDFS-4043. Namenode Kerberos Login does not use proper hostname for host qualified hdfs principal name. (#4693)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 75b19e2..9d62243 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -489,4 +489,7 @@
   public static final boolean IOSTATISTICS_THREAD_LEVEL_ENABLED_DEFAULT =
       true;
 
+  public static final String HADOOP_SECURITY_RESOLVER_IMPL =
+      "hadoop.security.resolver.impl";
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSDomainNameResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSDomainNameResolver.java
index 5866e29..ce962bf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSDomainNameResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSDomainNameResolver.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.net;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.NamingException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -27,6 +31,10 @@
  * fully qualified domain names belonging to the IPs from this host name
  */
 public class DNSDomainNameResolver implements DomainNameResolver {
+
+  private final static Logger LOG =
+      LoggerFactory.getLogger(DNSDomainNameResolver.class.getName());
+
   @Override
   public InetAddress[] getAllByDomainName(String domainName)
       throws UnknownHostException {
@@ -40,6 +48,16 @@
         && host.charAt(host.length()-1) == '.') {
       host = host.substring(0, host.length()-1);
     }
+    // Protect against the Java behaviour of returning the IP address as a string from a cache
+    // instead of performing a reverse lookup.
+    if (host != null && host.equals(address.getHostAddress())) {
+      LOG.debug("IP address returned for FQDN detected: {}", address.getHostAddress());
+      try {
+        return DNS.reverseDns(address, null);
+      } catch (NamingException lookupFailure) {
+        LOG.warn("Failed to perform reverse lookup: {}", address);
+      }
+    }
     return host;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
index 2b9822a..3369869 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
@@ -44,6 +44,8 @@
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.DomainNameResolver;
+import org.apache.hadoop.net.DomainNameResolverFactory;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
@@ -81,6 +83,8 @@
   @VisibleForTesting
   static HostResolver hostResolver;
 
+  private static DomainNameResolver domainNameResolver;
+
   private static boolean logSlowLookups;
   private static int slowLookupThresholdMs;
 
@@ -112,6 +116,9 @@
             .HADOOP_SECURITY_DNS_LOG_SLOW_LOOKUPS_THRESHOLD_MS_KEY,
         CommonConfigurationKeys
             .HADOOP_SECURITY_DNS_LOG_SLOW_LOOKUPS_THRESHOLD_MS_DEFAULT);
+
+    domainNameResolver = DomainNameResolverFactory.newInstance(conf,
+        CommonConfigurationKeys.HADOOP_SECURITY_RESOLVER_IMPL);
   }
 
   /**
@@ -212,7 +219,7 @@
         throw new IOException("Can't replace " + HOSTNAME_PATTERN
             + " pattern since client address is null");
       }
-      return replacePattern(components, addr.getCanonicalHostName());
+      return replacePattern(components, domainNameResolver.getHostnameByIP(addr));
     }
   }
   
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 5a1c09f..ec50c1e 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -131,6 +131,14 @@
 </property>
 
 <property>
+  <name>hadoop.security.resolver.impl</name>
+  <value>org.apache.hadoop.net.DNSDomainNameResolver</value>
+  <description>
+    The resolver implementation used to resolve FQDN for Kerberos
+  </description>
+</property>
+
+<property>
   <name>hadoop.security.dns.log-slow-lookups.enabled</name>
   <value>false</value>
   <description>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestDNSDomainNameResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestDNSDomainNameResolver.java
new file mode 100644
index 0000000..4729cee
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestDNSDomainNameResolver.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assume.assumeFalse;
+
+
+public class TestDNSDomainNameResolver {
+
+  static DNSDomainNameResolver DNR = new DNSDomainNameResolver();
+
+  @Test
+  public void testGetHostNameByIP() throws UnknownHostException {
+    InetAddress localhost = InetAddress.getLocalHost();
+    assumeFalse("IP lookup support required",
+        Objects.equals(localhost.getCanonicalHostName(), localhost.getHostAddress()));
+
+    // Precondition: host name and canonical host name for unresolved returns an IP address.
+    InetAddress unresolved = InetAddress.getByAddress(localhost.getHostAddress(),
+        localhost.getAddress());
+    assertEquals(localhost.getHostAddress(), unresolved.getHostName());
+
+    // Test: Get the canonical name despite InetAddress caching
+    String canonicalHostName = DNR.getHostnameByIP(unresolved);
+
+    // Verify: The canonical host name doesn't match the host address but does match the localhost.
+    assertNotEquals(localhost.getHostAddress(), canonicalHostName);
+    assertEquals(localhost.getCanonicalHostName(), canonicalHostName);
+  }
+
+}