HADOOP-18709. Add curator based ZooKeeper communication support over SSL/TLS into the common library. Contributed by Ferenc Erdelyi
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 58006c0..426f7a4 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -341,6 +341,14 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+    </dependency>
+    <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 9d62243..63f494a2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -417,6 +417,14 @@
   /** How often to retry a ZooKeeper operation  in milliseconds. */
   public static final String ZK_RETRY_INTERVAL_MS =
       ZK_PREFIX + "retry-interval-ms";
+  /** Keystore location for ZooKeeper client connection over SSL. */
+  public static final String ZK_SSL_KEYSTORE_LOCATION = ZK_PREFIX + "ssl.keystore.location";
+  /** Keystore password for ZooKeeper client connection over SSL. */
+  public static final String ZK_SSL_KEYSTORE_PASSWORD = ZK_PREFIX + "ssl.keystore.password";
+  /** Truststore location for ZooKeeper client connection over SSL. */
+  public static final String ZK_SSL_TRUSTSTORE_LOCATION = ZK_PREFIX + "ssl.truststore.location";
+  /** Truststore password for ZooKeeper client connection over SSL.  */
+  public static final String ZK_SSL_TRUSTSTORE_PASSWORD = ZK_PREFIX + "ssl.truststore.password";
   public static final int    ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
   /** Default domain name resolver for hadoop to use. */
   public static final String HADOOP_DOMAINNAME_RESOLVER_IMPL =
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
index c11b868..81ee466 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
@@ -23,6 +23,7 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -39,13 +40,17 @@
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.util.Preconditions;
 
+import javax.naming.ConfigurationException;
+
 /**
  * Helper class that provides utility methods specific to ZK operations.
  */
@@ -122,7 +127,7 @@
    * Start the connection to the ZooKeeper ensemble.
    * @throws IOException If the connection cannot be started.
    */
-  public void start() throws IOException {
+  public void start() throws IOException{
     this.start(new ArrayList<>());
   }
 
@@ -132,6 +137,20 @@
    * @throws IOException If the connection cannot be started.
    */
   public void start(List<AuthInfo> authInfos) throws IOException {
+    this.start(authInfos, false);
+  }
+
+  /**
+   * Start the connection to the ZooKeeper ensemble.
+   *
+   * @param authInfos  List of authentication keys.
+   * @param sslEnabled If the connection should be SSL/TLS encrypted.
+   * @throws IOException            If the connection cannot be started.
+   */
+  public void start(List<AuthInfo> authInfos, boolean sslEnabled)
+      throws IOException{
+
+    ZKClientConfig zkClientConfig = new ZKClientConfig();
 
     // Connect to the ZooKeeper ensemble
     String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
@@ -139,6 +158,8 @@
       throw new IOException(
           CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
     }
+    LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);
+
     int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
         CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
     int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
@@ -156,21 +177,49 @@
     for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
       authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
     }
-
-    CuratorFramework client = CuratorFrameworkFactory.builder()
-        .connectString(zkHostPort)
-        .zookeeperFactory(new HadoopZookeeperFactory(
-            conf.get(CommonConfigurationKeys.ZK_SERVER_PRINCIPAL),
-            conf.get(CommonConfigurationKeys.ZK_KERBEROS_PRINCIPAL),
-            conf.get(CommonConfigurationKeys.ZK_KERBEROS_KEYTAB)))
-        .sessionTimeoutMs(zkSessionTimeout)
-        .retryPolicy(retryPolicy)
-        .authorization(authInfos)
-        .build();
+    if (sslEnabled) {
+      validateSslConfiguration(conf);
+    }
+    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkHostPort)
+        .zookeeperFactory(
+            new HadoopZookeeperFactory(conf.get(CommonConfigurationKeys.ZK_SERVER_PRINCIPAL),
+                conf.get(CommonConfigurationKeys.ZK_KERBEROS_PRINCIPAL),
+                conf.get(CommonConfigurationKeys.ZK_KERBEROS_KEYTAB), sslEnabled,
+                new TruststoreKeystore(conf))).zkClientConfig(zkClientConfig)
+        .sessionTimeoutMs(zkSessionTimeout).retryPolicy(retryPolicy)
+        .authorization(authInfos).build();
     client.start();
 
     this.curator = client;
   }
+  /* Check on SSL/TLS client connection requirements to emit the name of the
+   configuration missing. It improves supportability. */
+  private void validateSslConfiguration(Configuration config) throws IOException {
+    if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION))) {
+      throw new IOException(
+          "The SSL encryption is enabled for the component's ZooKeeper client connection, "
+              + "however the " + CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION + " " +
+              "parameter is empty.");
+    }
+    if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD))) {
+      throw new IOException(
+          "The SSL encryption is enabled for the component's " + "ZooKeeper client connection, "
+              + "however the " + CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD + " " +
+              "parameter is empty.");
+    }
+    if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION))) {
+      throw new IOException(
+          "The SSL encryption is enabled for the component's ZooKeeper client connection, "
+              + "however the " + CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION + " " +
+              "parameter is empty.");
+    }
+    if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD))) {
+      throw new IOException(
+          "The SSL encryption is enabled for the component's ZooKeeper client connection, "
+              + "however the " + CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD + "  " +
+              "parameter is empty.");
+    }
+  }
 
   /**
    * Get ACLs for a ZNode.
@@ -414,14 +463,14 @@
         throws Exception {
       this.fencingNodePath = fencingNodePath;
       curatorOperations.add(curator.transactionOp().create()
-                              .withMode(CreateMode.PERSISTENT)
-                              .withACL(fencingACL)
-                              .forPath(fencingNodePath, new byte[0]));
+          .withMode(CreateMode.PERSISTENT)
+          .withACL(fencingACL)
+          .forPath(fencingNodePath, new byte[0]));
     }
 
     public void commit() throws Exception {
       curatorOperations.add(curator.transactionOp().delete()
-                              .forPath(fencingNodePath));
+          .forPath(fencingNodePath));
       curator.transaction().forOperations(curatorOperations);
       curatorOperations.clear();
     }
@@ -429,21 +478,21 @@
     public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
         throws Exception {
       curatorOperations.add(curator.transactionOp().create()
-                              .withMode(mode)
-                              .withACL(acl)
-                              .forPath(path, data));
+          .withMode(mode)
+          .withACL(acl)
+          .forPath(path, data));
     }
 
     public void delete(String path) throws Exception {
       curatorOperations.add(curator.transactionOp().delete()
-                              .forPath(path));
+          .forPath(path));
     }
 
     public void setData(String path, byte[] data, int version)
         throws Exception {
       curatorOperations.add(curator.transactionOp().setData()
-                              .withVersion(version)
-                              .forPath(path, data));
+          .withVersion(version)
+          .forPath(path, data));
     }
   }
 
@@ -452,21 +501,53 @@
     private final String zkPrincipal;
     private final String kerberosPrincipal;
     private final String kerberosKeytab;
+    private final Boolean sslEnabled;
+    private final TruststoreKeystore truststoreKeystore;
 
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client connection.
+     * @param zkPrincipal Optional.
+     */
     public HadoopZookeeperFactory(String zkPrincipal) {
       this(zkPrincipal, null, null);
     }
 
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client connection.
+     * @param zkPrincipal Optional.
+     * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
+     */
     public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
         String kerberosKeytab) {
+      this(zkPrincipal, kerberosPrincipal, kerberosKeytab, false,
+          new TruststoreKeystore(new Configuration()));
+    }
+
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client connection.
+     *
+     * @param zkPrincipal        Optional.
+     * @param kerberosPrincipal  Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab     Optional. Use along with kerberosPrincipal.
+     * @param sslEnabled         Flag to enable SSL/TLS ZK client connection for each component
+     *                           independently.
+     * @param truststoreKeystore TruststoreKeystore object containing the keystoreLocation,
+     *                           keystorePassword, truststoreLocation, truststorePassword for
+     *                           SSL/TLS connection when sslEnabled is set to true.
+     */
+    public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
+        String kerberosKeytab, boolean sslEnabled, TruststoreKeystore truststoreKeystore) {
       this.zkPrincipal = zkPrincipal;
       this.kerberosPrincipal = kerberosPrincipal;
       this.kerberosKeytab = kerberosKeytab;
+      this.sslEnabled = sslEnabled;
+      this.truststoreKeystore = truststoreKeystore;
     }
 
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout,
-                                  Watcher watcher, boolean canBeReadOnly
+        Watcher watcher, boolean canBeReadOnly
     ) throws Exception {
       ZKClientConfig zkClientConfig = new ZKClientConfig();
       if (zkPrincipal != null) {
@@ -478,10 +559,65 @@
       if (zkClientConfig.isSaslClientEnabled() && !isJaasConfigurationSet(zkClientConfig)) {
         setJaasConfiguration(zkClientConfig);
       }
+      if (sslEnabled) {
+        setSslConfiguration(zkClientConfig);
+      }
       return new ZooKeeper(connectString, sessionTimeout, watcher,
           canBeReadOnly, zkClientConfig);
     }
 
+    /**
+     * Configure ZooKeeper Client with SSL/TLS connection.
+     * @param zkClientConfig ZooKeeper Client configuration
+     */
+    private void setSslConfiguration(ZKClientConfig zkClientConfig) throws ConfigurationException {
+      this.setSslConfiguration(zkClientConfig, new ClientX509Util());
+    }
+
+    private void setSslConfiguration(ZKClientConfig zkClientConfig, ClientX509Util x509Util)
+        throws ConfigurationException {
+      validateSslConfiguration();
+      LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for connecting to the "
+          + "ZooKeeper server.");
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          this.truststoreKeystore.keystoreLocation,
+          CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION);
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          this.truststoreKeystore.truststoreLocation,
+          CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION);
+
+      zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+      zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+          "org.apache.zookeeper.ClientCnxnSocketNetty");
+      zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
+          this.truststoreKeystore.keystoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
+          this.truststoreKeystore.keystorePassword);
+      zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
+          this.truststoreKeystore.truststoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
+          this.truststoreKeystore.truststorePassword);
+    }
+
+    private void validateSslConfiguration() throws ConfigurationException {
+      if (StringUtils.isEmpty(this.truststoreKeystore.keystoreLocation)) {
+        throw new ConfigurationException(
+            "The keystore location parameter is empty for the ZooKeeper client connection.");
+      }
+      if (StringUtils.isEmpty(this.truststoreKeystore.keystorePassword)) {
+        throw new ConfigurationException(
+            "The keystore password parameter is empty for the ZooKeeper client connection.");
+      }
+      if (StringUtils.isEmpty(this.truststoreKeystore.truststoreLocation)) {
+        throw new ConfigurationException(
+            "The truststore location parameter is empty for the ZooKeeper client connection.");
+      }
+      if (StringUtils.isEmpty(this.truststoreKeystore.truststorePassword)) {
+        throw new ConfigurationException(
+            "The truststore password parameter is empty for the ZooKeeper client connection.");
+      }
+    }
+
     private boolean isJaasConfigurationSet(ZKClientConfig zkClientConfig) {
       String clientConfig = zkClientConfig.getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
           ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT);
@@ -503,4 +639,44 @@
       zkClientConfig.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, JAAS_CLIENT_ENTRY);
     }
   }
+
+  /**
+   * Helper class to contain the Truststore/Keystore paths for the ZK client connection over
+   * SSL/TLS.
+   */
+  public static class TruststoreKeystore {
+    private final String keystoreLocation;
+    private final String keystorePassword;
+    private final String truststoreLocation;
+    private final String truststorePassword;
+
+    /**
+     * Configuration for the ZooKeeper connection when SSL/TLS is enabled.
+     * When a value is not configured, ensure that empty string is set instead of null.
+     *
+     * @param conf ZooKeeper Client configuration
+     */
+    public TruststoreKeystore(Configuration conf) {
+      keystoreLocation = conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, "");
+      keystorePassword = conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "");
+      truststoreLocation = conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, "");
+      truststorePassword = conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "");
+    }
+
+    public String getKeystoreLocation() {
+      return keystoreLocation;
+    }
+
+    public String getKeystorePassword() {
+      return keystorePassword;
+    }
+
+    public String getTruststoreLocation() {
+      return truststoreLocation;
+    }
+
+    public String getTruststorePassword() {
+      return truststorePassword;
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 537feeba..34dadf3 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -3924,6 +3924,35 @@
         the ZK CLI).
     </description>
   </property>
+
+  <property>
+    <name>hadoop.zk.ssl.keystore.location</name>
+    <description>
+      Keystore location for ZooKeeper client connection over SSL.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.zk.ssl.keystore.password</name>
+    <description>
+      Keystore password for ZooKeeper client connection over SSL.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.zk.ssl.truststore.location</name>
+    <decription>
+        Truststore location for ZooKeeper client connection over SSL.
+    </decription>
+  </property>
+
+  <property>
+    <name>hadoop.zk.ssl.truststore.password</name>
+    <description>
+      Truststore password for ZooKeeper client connection over SSL.
+    </description>
+  </property>
+
   <property>
     <name>hadoop.system.tags</name>
     <value>YARN,HDFS,NAMENODE,DATANODE,REQUIRED,SECURITY,KERBEROS,PERFORMANCE,CLIENT
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java
new file mode 100644
index 0000000..d83279a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  public static final boolean DELETE_DATA_DIRECTORY_ON_CLOSE = true;
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final int SECURE_CLIENT_PORT = 2281;
+  static final int JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+  private static final int SERVER_ID = 1;
+  private static final int TICK_TIME = 100;
+  private static final int MAX_CLIENT_CNXNS = 10;
+  public static final int ELECTION_PORT = -1;
+  public static final int QUORUM_PORT = -1;
+
+  @Before
+  public void setup() throws Exception {
+    // inject values to the ZK configuration file for secure connection
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", String.valueOf(SECURE_CLIENT_PORT));
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecureConfig();
+    InstanceSpec spec =
+        new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT, ELECTION_PORT, QUORUM_PORT,
+            DELETE_DATA_DIRECTORY_ON_CLOSE, SERVER_ID, TICK_TIME, MAX_CLIENT_CNXNS,
+            customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  /**
+   * A static method to configure the test ZK server to accept secure client connection.
+   * The self-signed certificates were generated for testing purposes as described below.
+   * For the ZK client to connect with the ZK server, the ZK server's keystore and truststore
+   * should be used.
+   * For testing purposes the keystore and truststore were generated using default values.
+   * 1. to generate the keystore.jks file:
+   * # keytool -genkey -alias mockcert -keyalg RSA -keystore keystore.jks -keysize 2048
+   * 2. generate the ca-cert and the ca-key:
+   * # openssl req -new -x509 -keyout ca-key -out ca-cert
+   * 3. to generate the certificate signing request (cert-file):
+   * # keytool -keystore keystore.jks -alias mockcert -certreq -file certificate-request
+   * 4. to generate the ca-cert.srl file and make the cert valid for 10 years:
+   * # openssl x509 -req -CA ca-cert -CAkey ca-key -in certificate-request -out cert-signed
+   * -days 3650 -CAcreateserial -passin pass:password
+   * 5. add the ca-cert to the keystore.jks:
+   * # keytool -keystore keystore.jks -alias mockca -import -file ca-cert
+   * 6. install the signed certificate to the keystore:
+   * # keytool -keystore keystore.jks -alias mockcert -import -file cert-signed
+   * 7. add the certificate to the truststore:
+   * # keytool -keystore truststore.jks -alias mockcert -import -file ca-cert
+   * For our purpose, we only need the end result of this process: the keystore.jks and the
+   * truststore.jks files.
+   *
+   * @return conf The method returns the updated Configuration.
+   */
+  public static Configuration setUpSecureConfig() {
+    return setUpSecureConfig(new Configuration(),
+        "src/test/java/org/apache/hadoop/util/curator" + "/resources/data");
+  }
+
+  public static Configuration setUpSecureConfig(Configuration conf, String testDataPath) {
+    System.setProperty("zookeeper.serverCnxnFactory",
+        NettyServerCnxnFactory.class.getCanonicalName());
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + "keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + "truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", String.valueOf(JUTE_MAXBUFFER));
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509",
+        "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION,
+        testDataPath + "/ssl/keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION,
+        testDataPath + "/ssl/truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(null, null, null, true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, null, false);
+    validateSSLConfiguration(this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD), zk);
+  }
+
+  private void validateSSLConfiguration(String keystoreLocation, String keystorePassword,
+      String truststoreLocation, String truststorePassword, ZooKeeper zk) {
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", keystoreLocation,
+          zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", keystorePassword,
+          zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", truststoreLocation,
+          zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", truststorePassword,
+          zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", Boolean.TRUE.toString(),
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        ClientCnxnSocketNetty.class.getCanonicalName(),
+        zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET));
+  }
+
+  @Test
+  public void testTruststoreKeystoreConfiguration() {
+    LOG.info("Entered to the testTruststoreKeystoreConfiguration test case.");
+    /*
+      By default the truststore/keystore configurations are not set, hence the values are null.
+      Validate that the null values are converted into empty strings by the class.
+     */
+    Configuration conf = new Configuration();
+    ZKCuratorManager.TruststoreKeystore truststoreKeystore =
+        new ZKCuratorManager.TruststoreKeystore(conf);
+
+    assertEquals("Validate that null value is converted to empty string.", "",
+        truststoreKeystore.getKeystoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        truststoreKeystore.getKeystorePassword());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        truststoreKeystore.getTruststoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        truststoreKeystore.getTruststorePassword());
+
+    //Validate that non-null values will remain intact
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, "/keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "keystorePassword");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, "/truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "truststorePassword");
+    ZKCuratorManager.TruststoreKeystore truststoreKeystore1 =
+        new ZKCuratorManager.TruststoreKeystore(conf);
+    assertEquals("Validate that non-null value kept intact.", "/keystore.jks",
+        truststoreKeystore1.getKeystoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "keystorePassword",
+        truststoreKeystore1.getKeystorePassword());
+    assertEquals("Validate that null value is converted to empty string.", "/truststore.jks",
+        truststoreKeystore1.getTruststoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "truststorePassword",
+        truststoreKeystore1.getTruststorePassword());
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/keystore.jks b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/keystore.jks
new file mode 100644
index 0000000..89792c1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/keystore.jks
Binary files differ
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/truststore.jks b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/truststore.jks
new file mode 100644
index 0000000..c78514c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/truststore.jks
Binary files differ