ZOOKEEPER-4986: Disable reverse DNS lookup in TLS client and server

Reviewers: kezhuw
Author: anmolnar
Closes #2325 from anmolnar/ZOOKEEPER-4986
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index b14cb6b..fea20e5 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1764,6 +1764,16 @@
     This option requires the corresponding *hostnameVerification* option to be `true`, or it will be ignored.
     Default: true for quorum, false for clients
 
+* *ssl.allowReverseDnsLookup* and *ssl.quorum.allowReverseDnsLookup* :
+    (Java system properties: **zookeeper.ssl.allowReverseDnsLookup** and **zookeeper.ssl.quorum.allowReverseDnsLookup**)
+    **New in 3.9.5:**
+    Allow reverse DNS lookup in both server- and client hostname verifications if the hostname verification is enabled in
+    `ZKTrustManager`. Supported in both quorum and client TLS protocols. Not supported in FIPS mode. Reverse DNS lookups are
+    expensive and unnecessary in most cases. Make sure that certificates are created with all required Subject Alternative
+    Names (SAN) for successful identity verification. It's recommended to add SAN:IP entries for identity verification
+    of client certificates.
+    Default: false
+
 * *ssl.crl* and *ssl.quorum.crl* :
     (Java system properties: **zookeeper.ssl.crl** and **zookeeper.ssl.quorum.crl**)
     **New in 3.5.5:**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
index 16d7f27..b035bbf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
@@ -206,6 +206,7 @@ private TrustManager getTrustManager(ZKConfig config) throws X509Exception.Trust
         boolean sslOcspEnabled = config.getBoolean(getSslOcspEnabledProperty(), Boolean.parseBoolean(Security.getProperty("ocsp.enable")));
         boolean sslServerHostnameVerificationEnabled = isServerHostnameVerificationEnabled(config);
         boolean sslClientHostnameVerificationEnabled = isClientHostnameVerificationEnabled(config);
+        boolean allowReverseDnsLookup = allowReverseDnsLookup(config);
 
         if (trustStoreLocation.isEmpty()) {
             LOG.warn("{} not specified", getSslTruststoreLocationProperty());
@@ -213,7 +214,8 @@ private TrustManager getTrustManager(ZKConfig config) throws X509Exception.Trust
         } else {
             return createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType,
                 sslCrlEnabled, sslOcspEnabled, sslServerHostnameVerificationEnabled,
-                sslClientHostnameVerificationEnabled, getFipsMode(config));
+                sslClientHostnameVerificationEnabled, allowReverseDnsLookup,
+                getFipsMode(config));
         }
     }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java
index 2584fec..2ac1ad9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java
@@ -166,6 +166,7 @@ public io.netty.handler.ssl.ClientAuth toNettyClientAuth() {
     private final String sslContextSupplierClassProperty = getConfigPrefix() + "context.supplier.class";
     private final String sslHostnameVerificationEnabledProperty = getConfigPrefix() + "hostnameVerification";
     private final String sslClientHostnameVerificationEnabledProperty = getConfigPrefix() + "clientHostnameVerification";
+    private final String sslAllowReverseDnsLookupProperty = getConfigPrefix() + "allowReverseDnsLookup";
     private final String sslCrlEnabledProperty = getConfigPrefix() + "crl";
     private final String sslOcspEnabledProperty = getConfigPrefix() + "ocsp";
     private final String sslClientAuthProperty = getConfigPrefix() + "clientAuth";
@@ -244,6 +245,10 @@ public String getSslClientHostnameVerificationEnabledProperty() {
         return sslClientHostnameVerificationEnabledProperty;
     }
 
+    public String getSslAllowReverseDnsLookupProperty() {
+        return sslAllowReverseDnsLookupProperty;
+    }
+
     public String getSslCrlEnabledProperty() {
         return sslCrlEnabledProperty;
     }
@@ -283,6 +288,10 @@ public boolean isClientHostnameVerificationEnabled(ZKConfig config) {
             && config.getBoolean(this.getSslClientHostnameVerificationEnabledProperty(), shouldVerifyClientHostname());
     }
 
+    public boolean allowReverseDnsLookup(ZKConfig config) {
+        return config.getBoolean(this.getSslAllowReverseDnsLookupProperty(), false);
+    }
+
     public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextException {
         return getDefaultSSLContextAndOptions().getSSLContext();
     }
@@ -401,6 +410,7 @@ public SSLContextAndOptions createSSLContextAndOptionsFromConfig(ZKConfig config
 
         boolean sslServerHostnameVerificationEnabled = isServerHostnameVerificationEnabled(config);
         boolean sslClientHostnameVerificationEnabled = isClientHostnameVerificationEnabled(config);
+        boolean allowReverseDnsLookup = allowReverseDnsLookup(config);
         boolean fipsMode = getFipsMode(config);
 
         if (trustStoreLocationProp.isEmpty()) {
@@ -410,7 +420,7 @@ public SSLContextAndOptions createSSLContextAndOptionsFromConfig(ZKConfig config
                 trustManagers = new TrustManager[]{
                     createTrustManager(trustStoreLocationProp, trustStorePasswordProp, trustStoreTypeProp, sslCrlEnabled,
                         sslOcspEnabled, sslServerHostnameVerificationEnabled, sslClientHostnameVerificationEnabled,
-                        fipsMode)};
+                        allowReverseDnsLookup, fipsMode)};
             } catch (TrustManagerException trustManagerException) {
                 throw new SSLContextException("Failed to create TrustManager", trustManagerException);
             } catch (IllegalArgumentException e) {
@@ -546,6 +556,7 @@ public static X509TrustManager createTrustManager(
         boolean ocspEnabled,
         final boolean serverHostnameVerificationEnabled,
         final boolean clientHostnameVerificationEnabled,
+        final boolean allowReverseDnsLookup,
         final boolean fipsMode) throws TrustManagerException {
         if (trustStorePassword == null) {
             trustStorePassword = "";
@@ -604,7 +615,7 @@ public static X509TrustManager createTrustManager(
                         LOG.debug("FIPS mode is OFF: creating ZKTrustManager");
                     }
                     return new ZKTrustManager((X509ExtendedTrustManager) tm, serverHostnameVerificationEnabled,
-                        clientHostnameVerificationEnabled);
+                        clientHostnameVerificationEnabled, allowReverseDnsLookup);
                 }
             }
             throw new TrustManagerException("Couldn't find X509TrustManager");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java
index 47fd943..f6221ee 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java
@@ -151,6 +151,7 @@ private void putSSLProperties(X509Util x509Util) {
         properties.put(x509Util.getSslContextSupplierClassProperty(), System.getProperty(x509Util.getSslContextSupplierClassProperty()));
         properties.put(x509Util.getSslClientHostnameVerificationEnabledProperty(), System.getProperty(x509Util.getSslClientHostnameVerificationEnabledProperty()));
         properties.put(x509Util.getSslHostnameVerificationEnabledProperty(), System.getProperty(x509Util.getSslHostnameVerificationEnabledProperty()));
+        properties.put(x509Util.getSslAllowReverseDnsLookupProperty(), System.getProperty(x509Util.getSslAllowReverseDnsLookupProperty()));
         properties.put(x509Util.getSslCrlEnabledProperty(), System.getProperty(x509Util.getSslCrlEnabledProperty()));
         properties.put(x509Util.getSslOcspEnabledProperty(), System.getProperty(x509Util.getSslOcspEnabledProperty()));
         properties.put(x509Util.getSslClientAuthProperty(), System.getProperty(x509Util.getSslClientAuthProperty()));
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKTrustManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKTrustManager.java
index cbadd1e..e2af9f6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKTrustManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKTrustManager.java
@@ -42,6 +42,7 @@ public class ZKTrustManager extends X509ExtendedTrustManager {
     private final X509ExtendedTrustManager x509ExtendedTrustManager;
     private final boolean serverHostnameVerificationEnabled;
     private final boolean clientHostnameVerificationEnabled;
+    private final boolean allowReverseDnsLookup;
 
     private final ZKHostnameVerifier hostnameVerifier;
 
@@ -57,22 +58,26 @@ public class ZKTrustManager extends X509ExtendedTrustManager {
     ZKTrustManager(
         X509ExtendedTrustManager x509ExtendedTrustManager,
         boolean serverHostnameVerificationEnabled,
-        boolean clientHostnameVerificationEnabled) {
+        boolean clientHostnameVerificationEnabled,
+        boolean allowReverseDnsLookup) {
         this(x509ExtendedTrustManager,
                 serverHostnameVerificationEnabled,
                 clientHostnameVerificationEnabled,
-                new ZKHostnameVerifier());
+                new ZKHostnameVerifier(),
+                allowReverseDnsLookup);
     }
 
     ZKTrustManager(
             X509ExtendedTrustManager x509ExtendedTrustManager,
             boolean serverHostnameVerificationEnabled,
             boolean clientHostnameVerificationEnabled,
-            ZKHostnameVerifier hostnameVerifier) {
+            ZKHostnameVerifier hostnameVerifier,
+            boolean allowReverseDnsLookup) {
         this.x509ExtendedTrustManager = x509ExtendedTrustManager;
         this.serverHostnameVerificationEnabled = serverHostnameVerificationEnabled;
         this.clientHostnameVerificationEnabled = clientHostnameVerificationEnabled;
         this.hostnameVerifier = hostnameVerifier;
+        this.allowReverseDnsLookup = allowReverseDnsLookup;
     }
 
     @Override
@@ -166,31 +171,46 @@ public void checkServerTrusted(X509Certificate[] chain, String authType) throws
      * @param certificate Peer's certificate
      * @throws CertificateException Thrown if the provided certificate doesn't match the peer hostname.
      */
-    private void performHostVerification(
-        InetAddress inetAddress,
-        X509Certificate certificate
-    ) throws CertificateException {
+    private void performHostVerification(InetAddress inetAddress, X509Certificate certificate)
+        throws CertificateException {
         String hostAddress = "";
         String hostName = "";
         try {
             hostAddress = inetAddress.getHostAddress();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Trying to verify host address first: {}", hostAddress);
-            }
             hostnameVerifier.verify(hostAddress, certificate);
         } catch (SSLException addressVerificationException) {
+            // If we fail with hostAddress, we should try the hostname.
+            // The inetAddress may have been created with a hostname, in which case getHostName() will
+            // return quickly below. If not, a reverse lookup will happen, which can be expensive.
+            // We provide the option to skip the reverse lookup if preferring to fail fast.
+
+            // Handle logging here to aid debugging. The easiest way to check for an existing
+            // hostname is through toString, see javadoc.
+            String inetAddressString = inetAddress.toString();
+            if (!inetAddressString.startsWith("/")) {
+                LOG.debug(
+                    "Failed to verify host address: {}, but inetAddress {} has a hostname, trying that",
+                    hostAddress, inetAddressString, addressVerificationException);
+            } else if (allowReverseDnsLookup) {
+                LOG.debug(
+                    "Failed to verify host address: {}, attempting to verify host name with reverse dns",
+                    hostAddress, addressVerificationException);
+            } else {
+                LOG.debug("Failed to verify host address: {}, but reverse dns lookup is disabled",
+                    hostAddress, addressVerificationException);
+                throw new CertificateException(
+                    "Failed to verify host address, and reverse lookup is disabled",
+                    addressVerificationException);
+            }
+
             try {
                 hostName = inetAddress.getHostName();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        "Failed to verify host address: {}, trying to verify host name: {}",
-                        hostAddress, hostName);
-                }
                 hostnameVerifier.verify(hostName, certificate);
             } catch (SSLException hostnameVerificationException) {
                 LOG.error("Failed to verify host address: {}", hostAddress, addressVerificationException);
                 LOG.error("Failed to verify hostname: {}", hostName, hostnameVerificationException);
-                throw new CertificateException("Failed to verify both host address and host name", hostnameVerificationException);
+                throw new CertificateException("Failed to verify both host address and host name",
+                    hostnameVerificationException);
             }
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
index 8286c06..befdecd 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
@@ -89,6 +89,7 @@ public X509AuthenticationProvider() throws X509Exception {
             boolean crlEnabled = config.getBoolean(x509Util.getSslCrlEnabledProperty(), Boolean.getBoolean("com.sun.net.ssl.checkRevocation"));
             boolean ocspEnabled = config.getBoolean(x509Util.getSslOcspEnabledProperty(), Boolean.parseBoolean(Security.getProperty("ocsp.enable")));
             boolean hostnameVerificationEnabled = Boolean.parseBoolean(config.getProperty(x509Util.getSslHostnameVerificationEnabledProperty()));
+            boolean allowReverseDnsLookup = Boolean.parseBoolean(config.getProperty(x509Util.getSslAllowReverseDnsLookupProperty()));
 
             X509KeyManager km = null;
             X509TrustManager tm = null;
@@ -121,6 +122,7 @@ public X509AuthenticationProvider() throws X509Exception {
                         ocspEnabled,
                         hostnameVerificationEnabled,
                         false,
+                        allowReverseDnsLookup,
                         fipsMode);
                 } catch (TrustManagerException e) {
                     LOG.error("Failed to create trust manager", e);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java
index ff054c1..1c5104e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java
@@ -332,6 +332,7 @@ public void testLoadPEMTrustStore(
             false,
             true,
             true,
+            false,
             false);
     }
 
@@ -353,6 +354,7 @@ public void testLoadPEMTrustStoreNullPassword(
             false,
             true,
             true,
+            false,
             false);
 
     }
@@ -372,6 +374,7 @@ public void testLoadPEMTrustStoreAutodetectStoreFileType(
             false,
             true,
             true,
+            false,
             false);
     }
 
@@ -447,6 +450,7 @@ public void testLoadJKSTrustStore(
             true,
             true,
             true,
+            false,
             false);
     }
 
@@ -468,6 +472,7 @@ public void testLoadJKSTrustStoreNullPassword(
             false,
             true,
             true,
+            false,
             false);
     }
 
@@ -486,6 +491,7 @@ public void testLoadJKSTrustStoreAutodetectStoreFileType(
             true,
             true,
             true,
+            false,
             false);
     }
 
@@ -505,6 +511,7 @@ public void testLoadJKSTrustStoreWithWrongPassword(
                     true,
                     true,
                     true,
+                    false,
                     false);
         });
     }
@@ -580,6 +587,7 @@ public void testLoadPKCS12TrustStore(
             true,
             true,
             true,
+            false,
             false);
     }
 
@@ -601,6 +609,7 @@ public void testLoadPKCS12TrustStoreNullPassword(
             false,
             true,
             true,
+            false,
             false);
     }
 
@@ -619,6 +628,7 @@ public void testLoadPKCS12TrustStoreAutodetectStoreFileType(
             true,
             true,
             true,
+            false,
             false);
     }
 
@@ -638,6 +648,7 @@ public void testLoadPKCS12TrustStoreWithWrongPassword(
                     true,
                     true,
                     true,
+                    false,
                     false);
         });
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/ZKTrustManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/ZKTrustManagerTest.java
index 138e45e..568f2ef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/common/ZKTrustManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/ZKTrustManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.common;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -27,13 +28,16 @@
 import java.math.BigInteger;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.security.KeyPair;
 import java.security.KeyPairGenerator;
 import java.security.Security;
+import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -61,10 +65,10 @@
 import org.burningwave.tools.net.HostResolutionRequestInterceptor;
 import org.burningwave.tools.net.MappedHostResolver;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,16 +117,14 @@ public static void removeBouncyCastleProvider() throws Exception {
     @BeforeEach
     public void setup() throws Exception {
         mockX509ExtendedTrustManager = mock(X509ExtendedTrustManager.class);
+        mockSocket = createSocketWithHostname();
+    }
 
-        InetAddress mockInetAddress = InetAddress.getByName(HOSTNAME);
-
-        mockSocket = mock(Socket.class);
-        when(mockSocket.getInetAddress()).thenAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                return mockInetAddress;
-            }
-        });
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (mockSocket != null) {
+            mockSocket.close();
+        }
     }
 
     private X509Certificate[] createSelfSignedCertificateChain(String ipAddress, String hostname) throws Exception {
@@ -161,7 +163,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
     public void testServerHostnameVerificationWithHostnameVerificationDisabled() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, false, false,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(IP_ADDRESS, HOSTNAME);
         zkTrustManager.checkServerTrusted(certificateChain, null, mockSocket);
@@ -175,7 +177,7 @@ public void testServerHostnameVerificationWithHostnameVerificationDisabled() thr
     public void testServerHostnameVerificationWithHostnameVerificationDisabledAndClientHostnameVerificationEnabled() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, false, true,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(IP_ADDRESS, HOSTNAME);
         zkTrustManager.checkServerTrusted(certificateChain, null, mockSocket);
@@ -190,7 +192,7 @@ public void testServerHostnameVerificationWithHostnameVerificationDisabledAndCli
     public void testServerHostnameVerificationWithIPAddress() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, false,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(IP_ADDRESS, null);
         zkTrustManager.checkServerTrusted(certificateChain, null, mockSocket);
@@ -205,7 +207,7 @@ public void testServerHostnameVerificationWithIPAddress() throws Exception {
     public void testServerHostnameVerificationWithHostname() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, false,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
         zkTrustManager.checkServerTrusted(certificateChain, null, mockSocket);
@@ -220,7 +222,7 @@ public void testServerHostnameVerificationWithHostname() throws Exception {
     public void testClientHostnameVerificationWithHostnameVerificationDisabled() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, false, true,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
         zkTrustManager.checkClientTrusted(certificateChain, null, mockSocket);
@@ -235,7 +237,7 @@ public void testClientHostnameVerificationWithHostnameVerificationDisabled() thr
     public void testClientHostnameVerificationWithClientHostnameVerificationDisabled() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true,
-                false, hostnameVerifier);
+                false, hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
         zkTrustManager.checkClientTrusted(certificateChain, null, mockSocket);
@@ -250,7 +252,7 @@ public void testClientHostnameVerificationWithClientHostnameVerificationDisabled
     public void testClientHostnameVerificationWithIPAddress() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, true,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(IP_ADDRESS, null);
         zkTrustManager.checkClientTrusted(certificateChain, null, mockSocket);
@@ -265,7 +267,7 @@ public void testClientHostnameVerificationWithIPAddress() throws Exception {
     public void testClientHostnameVerificationWithHostname() throws Exception {
         VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
         ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, true,
-                hostnameVerifier);
+                hostnameVerifier, false);
 
         X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
         zkTrustManager.checkClientTrusted(certificateChain, null, mockSocket);
@@ -276,6 +278,64 @@ public void testClientHostnameVerificationWithHostname() throws Exception {
         verify(mockX509ExtendedTrustManager, times(1)).checkClientTrusted(certificateChain, null, mockSocket);
     }
 
+    @Test
+    public void testClientHostnameVerificationWithIpAddress_CertHostnameSan_NoReverseLookup_Fail() throws Exception {
+        VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
+        ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, true,
+            hostnameVerifier, false);
+
+        X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
+        try (Socket s = createSocketWithIpAddress()) {
+            assertThrows(CertificateException.class, () -> zkTrustManager.checkClientTrusted(certificateChain, null, s));
+            verify(s, times(1)).getInetAddress();
+            assertEquals(Collections.singletonList(IP_ADDRESS), hostnameVerifier.hosts);
+            verify(mockX509ExtendedTrustManager, times(1)).checkClientTrusted(certificateChain, null, s);
+        }
+    }
+
+    @Test
+    public void testClientHostnameVerificationWithIpAddress_CertHostnameSan_WithReverseLookup() throws Exception {
+        VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
+        ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, true,
+            hostnameVerifier, true);
+
+        X509Certificate[] certificateChain = createSelfSignedCertificateChain(null, HOSTNAME);
+        try (Socket s = createSocketWithIpAddress()) {
+            zkTrustManager.checkClientTrusted(certificateChain, null, s);
+            verify(s, times(1)).getInetAddress();
+            assertEquals(Arrays.asList(IP_ADDRESS, HOSTNAME), hostnameVerifier.hosts);
+            verify(mockX509ExtendedTrustManager, times(1)).checkClientTrusted(certificateChain, null, s);
+        }
+    }
+
+    @Test
+    public void testClientHostnameVerificationWithIpAddress_CertIpSan() throws Exception {
+        VerifiableHostnameVerifier hostnameVerifier = new VerifiableHostnameVerifier();
+        ZKTrustManager zkTrustManager = new ZKTrustManager(mockX509ExtendedTrustManager, true, true,
+            hostnameVerifier, false);
+
+        X509Certificate[] certificateChain = createSelfSignedCertificateChain(IP_ADDRESS, null);
+        try (Socket s = createSocketWithIpAddress()) {
+            zkTrustManager.checkClientTrusted(certificateChain, null, s);
+            verify(s, times(1)).getInetAddress();
+            assertEquals(Collections.singletonList(IP_ADDRESS), hostnameVerifier.hosts);
+            verify(mockX509ExtendedTrustManager, times(1)).checkClientTrusted(certificateChain, null, s);
+        }
+    }
+
+    private Socket createSocketWithHostname() throws UnknownHostException {
+        InetAddress mockInetAddress = InetAddress.getByName(HOSTNAME);
+        Socket s = mock(Socket.class);
+        when(s.getInetAddress()).thenAnswer((Answer) invocationOnMock -> mockInetAddress);
+        return s;
+    }
+
+    private Socket createSocketWithIpAddress() throws UnknownHostException {
+        InetAddress mockInetAddress = InetAddress.getByName(IP_ADDRESS);
+        Socket s = mock(Socket.class);
+        when(s.getInetAddress()).thenAnswer((Answer) invocationOnMock -> mockInetAddress);
+        return s;
+    }
 
     static class VerifiableHostnameVerifier extends ZKHostnameVerifier {
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSSLTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSSLTest.java
index 3ced181..1671334 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSSLTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSSLTest.java
@@ -142,6 +142,7 @@ public class QuorumSSLTest extends QuorumPeerTestBase {
 
     private static final char[] PASSWORD = "testpass".toCharArray();
     private static final String HOSTNAME = "localhost";
+    private static final String IPADDRESS = "127.0.0.1";
 
     private QuorumX509Util quorumX509Util;
 
@@ -487,6 +488,7 @@ private void clearSSLSystemProperties() {
         System.clearProperty(quorumX509Util.getSslTruststorePasswdProperty());
         System.clearProperty(quorumX509Util.getSslTruststorePasswdPathProperty());
         System.clearProperty(quorumX509Util.getSslHostnameVerificationEnabledProperty());
+        System.clearProperty(quorumX509Util.getSslAllowReverseDnsLookupProperty());
         System.clearProperty(quorumX509Util.getSslOcspEnabledProperty());
         System.clearProperty(quorumX509Util.getSslCrlEnabledProperty());
         System.clearProperty(quorumX509Util.getCipherSuitesProperty());
@@ -700,6 +702,8 @@ public void testHostnameVerificationForInvalidMultiAddressServerConfig(boolean f
     @Timeout(value = 5, unit = TimeUnit.MINUTES)
     public void testHostnameVerificationWithInvalidIpAddressAndValidHostname(boolean fipsEnabled) throws Exception {
         System.setProperty(quorumX509Util.getFipsModeProperty(), Boolean.toString(fipsEnabled));
+        // We need reverse DNS lookup to get this working, because quorum is connecting via ip addresses
+        System.setProperty(quorumX509Util.getSslAllowReverseDnsLookupProperty(), Boolean.toString(true));
 
         String badhostnameKeystorePath = tmpDir + "/badhost.jks";
         X509Certificate badHostCert = buildEndEntityCert(
@@ -805,7 +809,7 @@ public void testCertificateRevocationList(boolean fipsEnabled) throws Exception
             rootCertificate,
             rootKeyPair.getPrivate(),
             HOSTNAME,
-            null,
+            IPADDRESS,
             crlPath,
             null);
         writeKeystore(revokedInCRLCert, defaultKeyPair, revokedInCRLKeystorePath);
@@ -835,7 +839,7 @@ public void testCertificateRevocationList(boolean fipsEnabled) throws Exception
             rootCertificate,
             rootKeyPair.getPrivate(),
             HOSTNAME,
-            null,
+            IPADDRESS,
             crlPath,
             null);
         writeKeystore(validCertificate, defaultKeyPair, validKeystorePath);
@@ -874,7 +878,7 @@ public void testOCSP(boolean fipsEnabled) throws Exception {
             rootCertificate,
             rootKeyPair.getPrivate(),
             HOSTNAME,
-            null,
+            IPADDRESS,
             null,
             ocspPort);
         writeKeystore(revokedInOCSPCert, defaultKeyPair, revokedInOCSPKeystorePath);
@@ -909,7 +913,7 @@ public void testOCSP(boolean fipsEnabled) throws Exception {
                 rootCertificate,
                 rootKeyPair.getPrivate(),
                 HOSTNAME,
-                null,
+                IPADDRESS,
                 null,
                 ocspPort);
             writeKeystore(validCertificate, defaultKeyPair, validKeystorePath);
diff --git a/zookeeper-server/src/test/resources/data/ssl/README.md b/zookeeper-server/src/test/resources/data/ssl/README.md
index b8823d8..26d4d0b 100644
--- a/zookeeper-server/src/test/resources/data/ssl/README.md
+++ b/zookeeper-server/src/test/resources/data/ssl/README.md
@@ -1,6 +1,21 @@
 SSL test data
 ===================
 
+Create keystore with certificate
+```
+keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -dname "CN=localhost,OU=ZooKeeper,O=Apache,L=Unknown,ST=Unknown,C=Unknown" -keypass testpass -keystore keystore.jks -storepass testpass -ext SAN=DNS:localhost,IP:127.0.0.1
+```
+
+Export certificate to file
+```
+keytool -exportcert -alias test -keystore keystore.jks -file test.cer -rfc
+```
+
+Create truststore
+```
+keytool -importcert -alias test -file test.cer -keystore truststore.jks -storepass testpass
+```
+
 testKeyStore.jks
 ---
 Testing keystore, password is "testpass".
diff --git a/zookeeper-server/src/test/resources/data/ssl/testKeyStore.jks b/zookeeper-server/src/test/resources/data/ssl/testKeyStore.jks
index 40a7d0b..60e5d52 100644
--- a/zookeeper-server/src/test/resources/data/ssl/testKeyStore.jks
+++ b/zookeeper-server/src/test/resources/data/ssl/testKeyStore.jks
Binary files differ
diff --git a/zookeeper-server/src/test/resources/data/ssl/testTrustStore.jks b/zookeeper-server/src/test/resources/data/ssl/testTrustStore.jks
index 33f09c1..bced986 100644
--- a/zookeeper-server/src/test/resources/data/ssl/testTrustStore.jks
+++ b/zookeeper-server/src/test/resources/data/ssl/testTrustStore.jks
Binary files differ