HDDS-4915. [SCM HA Security] Integrate CertClient. (#2000)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 8703011..e714f7f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -82,7 +82,7 @@
   private final long timeout;
   private SecurityConfig secConfig;
   private final boolean topologyAwareRead;
-  private X509Certificate caCert;
+  private List<X509Certificate> caCerts;
   // Cache the DN which returned the GetBlock command so that the ReadChunk
   // command can be sent to the same DN.
   private Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
@@ -93,10 +93,10 @@
    *
    * @param pipeline - Pipeline that defines the machines.
    * @param config   -- Ozone Config
-   * @param caCert   - SCM ca certificate.
+   * @param caCerts   - SCM ca certificate.
    */
   public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
-      X509Certificate caCert) {
+      List<X509Certificate> caCerts) {
     super();
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkNotNull(config);
@@ -114,7 +114,7 @@
     this.topologyAwareRead = config.getBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
-    this.caCert = caCert;
+    this.caCerts = caCerts;
     this.getBlockDNcache = new ConcurrentHashMap<>();
   }
 
@@ -179,8 +179,8 @@
             .intercept(new GrpcClientInterceptor());
     if (secConfig.isGrpcTlsEnabled()) {
       SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
-      if (caCert != null) {
-        sslContextBuilder.trustManager(caCert);
+      if (caCerts != null) {
+        sslContextBuilder.trustManager(caCerts);
       }
       if (secConfig.useTestCert()) {
         channelBuilder.overrideAuthority("localhost");
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 31728c6..f016972 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -20,8 +20,8 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -31,8 +31,6 @@
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -68,7 +66,7 @@
   //TODO : change this to SCM configuration class
   private final ConfigurationSource conf;
   private final Cache<String, XceiverClientSpi> clientCache;
-  private X509Certificate caCert;
+  private List<X509Certificate> caCerts;
 
   private static XceiverClientMetrics metrics;
   private boolean isSecurityEnabled;
@@ -86,20 +84,15 @@
 
   public XceiverClientManager(ConfigurationSource conf,
       ScmClientConfig clientConf,
-      String caCertPem) throws IOException {
+      List<X509Certificate> caCerts) throws IOException {
     Preconditions.checkNotNull(clientConf);
     Preconditions.checkNotNull(conf);
     long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
     this.conf = conf;
     this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
     if (isSecurityEnabled) {
-      Preconditions.checkNotNull(caCertPem);
-      try {
-        this.caCert = CertificateCodec.getX509Cert(caCertPem);
-      } catch (CertificateException ex) {
-        throw new SCMSecurityException("Error: Fail to get SCM CA certificate",
-            ex);
-      }
+      Preconditions.checkNotNull(caCerts);
+      this.caCerts = caCerts;
     }
 
     this.clientCache = CacheBuilder.newBuilder()
@@ -232,10 +225,10 @@
             switch (type) {
             case RATIS:
               client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
-                  caCert);
+                  caCerts);
               break;
             case STAND_ALONE:
-              client = new XceiverClientGrpc(pipeline, conf, caCert);
+              client = new XceiverClientGrpc(pipeline, conf, caCerts);
               break;
             case CHAINED:
             default:
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index b6cd216..6982d41 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -81,13 +81,13 @@
 
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
-      ConfigurationSource ozoneConf, X509Certificate caCert) {
+      ConfigurationSource ozoneConf, List<X509Certificate> caCerts) {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
-        SecurityConfig(ozoneConf), caCert);
+        SecurityConfig(ozoneConf), caCerts);
     return new XceiverClientRatis(pipeline,
         SupportedRpcType.valueOfIgnoreCase(rpcType),
         retryPolicy, tlsConfig, ozoneConf);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 06acb44..e310cc9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -294,11 +294,11 @@
   // For External gRPC client to server with gRPC TLS.
   // No mTLS for external client as SCM CA does not issued certificates for them
   public static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf,
-      X509Certificate caCert) {
+      List<X509Certificate> caCerts) {
     GrpcTlsConfig tlsConfig = null;
     if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
       tlsConfig = new GrpcTlsConfig(null, null,
-          caCert, false);
+          caCerts, false);
     }
     return tlsConfig;
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index b3fc40e..594985e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -173,4 +173,15 @@
     }
     return conf;
   }
+
+  /**
+   * Get SCM Node Id list.
+   * @param configuration
+   * @return list of node ids.
+   */
+  public static Collection<String> getSCMNodeIds(
+      ConfigurationSource configuration) {
+    String scmServiceId = getScmServiceId(configuration);
+    return getSCMNodeIds(configuration, scmServiceId);
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
index d75acc4..7e008af 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -108,6 +108,7 @@
     DEFAULT,
     MISSING_BLOCK_TOKEN,
     BLOCK_TOKEN_VERIFICATION_FAILED,
-    GET_ROOT_CA_CERT_FAILED
+    GET_ROOT_CA_CERT_FAILED,
+    NOT_A_PRIMARY_SCM
   }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index b12a022..90ea073 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -41,6 +41,7 @@
   public static final String SCM_ID = "scmUuid";
   public static final String CLUSTER_ID_PREFIX = "CID-";
   public static final String SCM_CERT_SERIAL_ID = "scmCertSerialId";
+  public static final String PRIMARY_SCM_NODE_ID = "primaryScmNodeId";
 
   public static final String OZONE_SIMPLE_ROOT_USER = "root";
   public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
index 726862c..21e6ffb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
@@ -23,6 +23,8 @@
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.nio.file.Path;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -39,6 +41,8 @@
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HTTP_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,4 +124,25 @@
 
     return hostIps;
   }
+
+  /**
+   * Convert list of string encoded certificates to list of X509Certificate.
+   * @param pemEncodedCerts
+   * @return list of X509Certificate.
+   * @throws IOException
+   */
+  public static List<X509Certificate> convertToX509(
+      List<String> pemEncodedCerts) throws IOException {
+    List<X509Certificate> x509Certificates =
+        new ArrayList<>(pemEncodedCerts.size());
+    for (String cert : pemEncodedCerts) {
+      try {
+        x509Certificates.add(CertificateCodec.getX509Certificate(cert));
+      } catch (CertificateException ex) {
+        LOG.error("Error while converting to X509 format", ex);
+        throw new IOException(ex);
+      }
+    }
+    return x509Certificates;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 325736e..12f95c7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -355,6 +355,12 @@
         dnCertClient.storeCertificate(pemEncodedCert, true);
         dnCertClient.storeCertificate(response.getX509CACertificate(), true,
             true);
+
+        // Store Root CA certificate.
+        if (response.hasX509RootCACertificate()) {
+          dnCertClient.storeRootCACertificate(
+              response.getX509RootCACertificate(), true);
+        }
         String dnCertSerialId = getX509Certificate(pemEncodedCert).
             getSerialNumber().toString();
         datanodeDetails.setCertSerialId(dnCertSerialId);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 72859e1..6fd2706 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -59,6 +60,7 @@
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -461,19 +463,21 @@
   // DN Ratis server act as both SSL client and server and we must pass TLS
   // configuration for both.
   private static Parameters createTlsParameters(SecurityConfig conf,
-      CertificateClient caClient) {
+      CertificateClient caClient) throws IOException {
     Parameters parameters = new Parameters();
 
     if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      List<X509Certificate> caList = HAUtils.buildCAX509List(caClient,
+          conf.getConfiguration());
       GrpcTlsConfig serverConfig = new GrpcTlsConfig(
           caClient.getPrivateKey(), caClient.getCertificate(),
-          caClient.getCACertificate(), true);
+          caList, true);
       GrpcConfigKeys.Server.setTlsConf(parameters, serverConfig);
       GrpcConfigKeys.Admin.setTlsConf(parameters, serverConfig);
 
       GrpcTlsConfig clientConfig = new GrpcTlsConfig(
           caClient.getPrivateKey(), caClient.getCertificate(),
-          caClient.getCACertificate(), false);
+          caList, false);
       GrpcConfigKeys.Client.setTlsConf(parameters, clientConfig);
     }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 8b37c5b..5bb9fb1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -40,6 +41,7 @@
 import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@@ -167,8 +169,14 @@
     blockDeletingService =
         new BlockDeletingService(this, svcInterval.toMillis(), serviceTimeout,
             TimeUnit.MILLISECONDS, config);
-    tlsClientConfig = RatisHelper.createTlsClientConfig(
-        secConf, certClient != null ? certClient.getCACertificate() : null);
+
+    List< X509Certificate > x509Certificates = null;
+    if (certClient != null) {
+      x509Certificates = HAUtils.buildCAX509List(certClient, conf);
+    }
+
+    tlsClientConfig = RatisHelper.createTlsClientConfig(secConf,
+        x509Certificates);
 
     isStarted = new AtomicBoolean(false);
   }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 0ab5d7e..67d4daf 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -265,12 +265,16 @@
    */
   @Override
   public String getCACertificate() throws IOException {
+    return getCACert().getX509Certificate();
+  }
+
+
+  public SCMGetCertResponseProto getCACert() throws IOException {
     SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
         .getDefaultInstance();
     return submitRequest(Type.GetCACertificate,
         builder -> builder.setGetCACertificateRequest(protoIns))
-        .getGetCertResponseProto().getX509Certificate();
-
+        .getGetCertResponseProto();
   }
 
   /**
@@ -301,7 +305,7 @@
   public String getRootCACertificate() throws IOException {
     SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
         .getDefaultInstance();
-    return submitRequest(Type.GetCACertificate,
+    return submitRequest(Type.GetRootCACertificate,
         builder -> builder.setGetCACertificateRequest(protoIns))
         .getGetCertResponseProto().getX509RootCACertificate();
   }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index d982cf5..bdd7a20 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -70,11 +71,20 @@
   private final int maxRetryCount;
   private final long retryInterval;
 
+  private final UserGroupInformation ugi;
+
 
   public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
     this.conf = conf;
     this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
 
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException ex) {
+      LOG.error("Unable to fetch user credentials from UGI", ex);
+      throw new RuntimeException(ex);
+    }
+
     // Set some constant for non-HA.
     if (scmServiceId == null) {
       scmServiceId = SCM_DUMMY_SERVICE_ID;
@@ -230,10 +240,16 @@
         LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
     RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class,
         ProtobufRpcEngine.class);
-    return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
-        scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf,
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same OM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+    return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+        scmAddress, ugi, hadoopConf,
         NetUtils.getDefaultSocketFactory(hadoopConf),
-        (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut());
+        (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut(),
+        connectionRetryPolicy).getProxy();
   }
 
   public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
index ff13122..96f86b8 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -70,9 +71,18 @@
   private final int maxRetryCount;
   private final long retryInterval;
 
+  private final UserGroupInformation ugi;
+
 
   public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf) {
     this.conf = conf;
+
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException ex) {
+      LOG.error("Unable to fetch user credentials from UGI", ex);
+      throw new RuntimeException(ex);
+    }
     this.scmVersion = RPC.getProtocolVersion(
         StorageContainerLocationProtocolPB.class);
 
@@ -235,11 +245,16 @@
         LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
     RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class,
         ProtobufRpcEngine.class);
-    return RPC.getProxy(
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same OM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+    return RPC.getProtocolProxy(
         StorageContainerLocationProtocolPB.class,
-        scmVersion, scmAddress, UserGroupInformation.getCurrentUser(),
+        scmVersion, scmAddress, ugi,
         hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
-        (int)scmClientConfig.getRpcTimeOut());
+        (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy();
   }
 
   public RetryPolicy getRetryPolicy() {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
index a2d2fb3..91e0dca 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -199,7 +199,7 @@
    * @return the new proxy index
    */
   private synchronized int incrementProxyIndex() {
-    currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+    currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size();
     currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
     return currentProxyIndex;
   }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
index 0a669b9..2f3657d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
@@ -54,6 +54,9 @@
   void storeValidCertificate(BigInteger serialID,
       X509Certificate certificate, NodeType role) throws IOException;
 
+  void storeValidScmCertificate(BigInteger serialID,
+      X509Certificate certificate) throws IOException;
+
   /**
    * Check certificate serialID exists or not. If exists throws an exception.
    * @param serialID
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index 39f610c..35122c7 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -67,6 +67,7 @@
 
   /**
    * Sign function signs a Certificate.
+   *
    * @param config - Security Config.
    * @param caPrivate - CAs private Key.
    * @param caCertificate - CA Certificate.
@@ -81,7 +82,7 @@
    */
   @SuppressWarnings("ParameterNumber")
   @Override
-  public  X509CertificateHolder sign(
+  public X509CertificateHolder sign(
       SecurityConfig config,
       PrivateKey caPrivate,
       X509CertificateHolder caCertificate,
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 0ec4d42..fb51227 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.security.PrivateKey;
 import java.security.PublicKey;
@@ -218,4 +219,46 @@
    */
   String getComponentName();
 
+  /**
+   * Return the latest Root CA certificate known to the client.
+   * @return latest Root CA certificate known to the client.
+   */
+  X509Certificate getRootCACertificate();
+
+  /**
+   * Store RootCA certificate.
+   * @param pemEncodedCert
+   * @param force
+   * @throws CertificateException
+   */
+  void storeRootCACertificate(String pemEncodedCert, boolean force)
+      throws CertificateException;
+
+  /**
+   * Return the pem encoded CA certificate list.
+   *
+   * If initialized return list of pem encoded CA certificates, else return
+   * null.
+   * @return list of pem encoded CA certificates.
+   */
+  List<String> getCAList();
+
+  /**
+   * Return the pem encoded  CA certificate list.
+   *
+   * If list is null, fetch the list from SCM and returns the list.
+   * If list is not null, return the pem encoded  CA certificate list.
+   *
+   * @return list of pem encoded  CA certificates.
+   * @throws IOException
+   */
+  List<String> listCA() throws IOException;
+
+  /**
+   * Update and returns the pem encoded CA certificate list.
+   * @return list of pem encoded  CA certificates.
+   * @throws IOException
+   */
+  List<String> updateCAList() throws IOException;
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 1b04356..19c0e78 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -42,25 +41,19 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
-import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
@@ -88,6 +81,8 @@
   private static final String CERT_FILE_NAME_FORMAT = "%s.crt";
   private static final String CA_CERT_PREFIX = "CA-";
   private static final int CA_CERT_PREFIX_LEN = 3;
+  private static final String ROOT_CA_CERT_PREFIX = "ROOTCA-";
+  private static final int ROOT_CA_PREFIX_LEN = 7;
   private final Logger logger;
   private final SecurityConfig securityConfig;
   private final KeyCodec keyCodec;
@@ -97,7 +92,10 @@
   private Map<String, X509Certificate> certificateMap;
   private String certSerialId;
   private String caCertId;
+  private String rootCaCertId;
   private String component;
+  private List<String> pemEncodedCACerts = null;
+  private final Lock lock;
 
   DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
       String certSerialId, String component) {
@@ -108,6 +106,7 @@
     this.certificateMap = new ConcurrentHashMap<>();
     this.certSerialId = certSerialId;
     this.component = component;
+    lock = new ReentrantLock();
 
     loadAllCertificates();
   }
@@ -127,6 +126,7 @@
         CertificateCodec certificateCodec =
             new CertificateCodec(securityConfig, component);
         long latestCaCertSerailId = -1L;
+        long latestRootCaCertSerialId = -1L;
         for (File file : certFiles) {
           if (file.isFile()) {
             try {
@@ -149,6 +149,16 @@
                     latestCaCertSerailId = tmpCaCertSerailId;
                   }
                 }
+
+                if (file.getName().startsWith(ROOT_CA_CERT_PREFIX)) {
+                  String certFileName = FilenameUtils.getBaseName(
+                      file.getName());
+                  long tmpRootCaCertSerailId = NumberUtils.toLong(
+                      certFileName.substring(ROOT_CA_PREFIX_LEN));
+                  if (tmpRootCaCertSerailId > latestRootCaCertSerialId) {
+                    latestRootCaCertSerialId = tmpRootCaCertSerailId;
+                  }
+                }
                 getLogger().info("Added certificate from file:{}.",
                     file.getAbsolutePath());
               } else {
@@ -164,6 +174,9 @@
         if (latestCaCertSerailId != -1) {
           caCertId = Long.toString(latestCaCertSerailId);
         }
+        if (latestRootCaCertSerialId != -1) {
+          rootCaCertId = Long.toString(latestRootCaCertSerialId);
+        }
       }
     }
   }
@@ -282,7 +295,8 @@
     getLogger().info("Getting certificate with certSerialId:{}.",
         certId);
     try {
-      SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
+      SCMSecurityProtocol scmSecurityProtocolClient =
+          HddsServerUtil.getScmSecurityClient(
           (OzoneConfiguration) securityConfig.getConfiguration());
       String pemEncodedCert =
           scmSecurityProtocolClient.getCertificate(certId);
@@ -471,6 +485,8 @@
             builder.addIpAddress(ip.getHostAddress());
             if(validator.isValid(ip.getCanonicalHostName())) {
               builder.addDnsName(ip.getCanonicalHostName());
+            } else {
+              getLogger().error("Invalid domain {}", ip.getCanonicalHostName());
             }
           });
     } catch (IOException e) {
@@ -818,29 +834,82 @@
     return logger;
   }
 
-  /**
-   * Create a scm security client, used to get SCM signed certificate.
-   *
-   * @return {@link SCMSecurityProtocol}
-   */
-  private static SCMSecurityProtocol getScmSecurityClient(
-      OzoneConfiguration conf) throws IOException {
-    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-    InetSocketAddress scmSecurityProtoAdd =
-        HddsServerUtil.getScmAddressForSecurityProtocol(conf);
-    SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
-        new SCMSecurityProtocolClientSideTranslatorPB(
-            RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
-                scmSecurityProtoAdd, UserGroupInformation.getCurrentUser(),
-                conf, NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-    return scmSecurityClient;
-  }
-
   public String getComponentName() {
     return null;
   }
+
+  @Override
+  public X509Certificate getRootCACertificate() {
+    if (rootCaCertId != null) {
+      return certificateMap.get(rootCaCertId);
+    }
+    return null;
+  }
+
+  @Override
+  public void storeRootCACertificate(String pemEncodedCert, boolean force)
+      throws CertificateException {
+    CertificateCodec certificateCodec = new CertificateCodec(securityConfig,
+        component);
+    try {
+      Path basePath = securityConfig.getCertificateLocation(component);
+
+      X509Certificate cert =
+          CertificateCodec.getX509Certificate(pemEncodedCert);
+      String certName = String.format(CERT_FILE_NAME_FORMAT,
+          cert.getSerialNumber().toString());
+
+      certName = ROOT_CA_CERT_PREFIX + certName;
+      rootCaCertId = cert.getSerialNumber().toString();
+
+      certificateCodec.writeCertificate(basePath, certName,
+          pemEncodedCert, force);
+      certificateMap.putIfAbsent(cert.getSerialNumber().toString(), cert);
+    } catch (IOException | java.security.cert.CertificateException e) {
+      throw new CertificateException("Error while storing Root CA " +
+          "certificate.", e, CERTIFICATE_ERROR);
+    }
+  }
+
+  @Override
+  public List<String> getCAList() {
+    lock.lock();
+    try {
+      return pemEncodedCACerts;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public List<String> listCA() throws IOException {
+    lock.lock();
+    try {
+      if (pemEncodedCACerts == null) {
+        updateCAList();
+      }
+      return pemEncodedCACerts;
+    }finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public List<String> updateCAList() throws IOException {
+    lock.lock();
+    try {
+      SCMSecurityProtocol scmSecurityProtocolClient =
+          HddsServerUtil.getScmSecurityClient(
+              securityConfig.getConfiguration());
+      pemEncodedCACerts =
+          scmSecurityProtocolClient.listCACertificate();
+      return pemEncodedCACerts;
+    } catch (Exception e) {
+      getLogger().error("Error during updating CA list", e);
+      throw new CertificateException("Error during updating CA list", e,
+          CERTIFICATE_ERROR);
+    } finally {
+      lock.unlock();
+    }
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 6bf85ca..db129f4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -18,15 +18,25 @@
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.function.SupplierWithIOException;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
+import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.db.DBDefinition;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
@@ -35,6 +45,7 @@
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
@@ -47,7 +58,10 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
@@ -102,7 +116,7 @@
    * @throws IOException
    */
   public static ScmBlockLocationProtocol getScmBlockClient(
-      OzoneConfiguration conf) throws IOException {
+      OzoneConfiguration conf) {
     ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
         new ScmBlockLocationProtocolClientSideTranslatorPB(
             new SCMBlockLocationFailoverProxyProvider(conf));
@@ -111,6 +125,17 @@
             conf);
   }
 
+  public static StorageContainerLocationProtocol getScmContainerClient(
+      ConfigurationSource conf) {
+    SCMContainerLocationFailoverProxyProvider proxyProvider =
+        new SCMContainerLocationFailoverProxyProvider(conf);
+    StorageContainerLocationProtocol scmContainerClient =
+        TracingUtil.createProxy(
+            new StorageContainerLocationProtocolClientSideTranslatorPB(
+                proxyProvider), StorageContainerLocationProtocol.class, conf);
+    return scmContainerClient;
+  }
+
   /**
    * Replace the current DB with the new DB checkpoint.
    *
@@ -326,4 +351,146 @@
       }
     }
   }
+
+  /**
+   * Build CA list which need to be passed to client.
+   *
+   * If certificate client is null, obtain the list of CA using SCM security
+   * client, else it uses certificate client.
+   * @param certClient
+   * @param configuration
+   * @return list of CA
+   * @throws IOException
+   */
+  public static List<String> buildCAList(CertificateClient certClient,
+      ConfigurationSource configuration) throws IOException {
+    //TODO: make it configurable.
+    List<String> caCertPemList;
+    if (certClient != null) {
+      caCertPemList = new ArrayList<>();
+      if (!SCMHAUtils.isSCMHAEnabled(configuration)) {
+        if (certClient.getRootCACertificate() != null) {
+          caCertPemList.add(CertificateCodec.getPEMEncodedString(
+              certClient.getRootCACertificate()));
+        }
+        caCertPemList.add(CertificateCodec.getPEMEncodedString(
+            certClient.getCACertificate()));
+      } else {
+        Collection<String> scmNodes = SCMHAUtils.getSCMNodeIds(configuration);
+        int expectedCount = scmNodes.size() + 1;
+        if (scmNodes.size() > 1) {
+          // First check if cert client has ca list initialized.
+          // This is being done, when this method is called multiple times we
+          // don't make call to SCM, we return from in-memory.
+          caCertPemList = certClient.getCAList();
+          if (caCertPemList != null && caCertPemList.size() == expectedCount) {
+            return caCertPemList;
+          }
+          caCertPemList = waitForCACerts(() -> certClient.updateCAList(),
+              expectedCount);
+          checkCertCount(caCertPemList.size(), expectedCount);
+        } else {
+          caCertPemList = certClient.listCA();
+        }
+      }
+    } else {
+      SCMSecurityProtocolClientSideTranslatorPB scmSecurityProtocolClient =
+          HddsServerUtil.getScmSecurityClient(configuration);
+      if (!SCMHAUtils.isSCMHAEnabled(configuration)) {
+        caCertPemList = new ArrayList<>();
+        SCMGetCertResponseProto scmGetCertResponseProto =
+            scmSecurityProtocolClient.getCACert();
+        if (scmGetCertResponseProto.hasX509Certificate()) {
+          caCertPemList.add(scmGetCertResponseProto.getX509Certificate());
+        }
+        if (scmGetCertResponseProto.hasX509RootCACertificate()) {
+          caCertPemList.add(scmGetCertResponseProto.getX509RootCACertificate());
+        }
+      } else {
+        Collection<String> scmNodes = SCMHAUtils.getSCMNodeIds(configuration);
+        int expectedCount = scmNodes.size() + 1;
+        if (scmNodes.size() > 1) {
+          caCertPemList = waitForCACerts(
+              () -> scmSecurityProtocolClient.listCACertificate(),
+              expectedCount);
+          checkCertCount(caCertPemList.size(), expectedCount);
+        } else{
+          caCertPemList = scmSecurityProtocolClient.listCACertificate();
+        }
+      }
+    }
+    return caCertPemList;
+  }
+
+  private static List<String> waitForCACerts(
+      final SupplierWithIOException<List<String>> applyFunction,
+      int expectedCount) throws IOException {
+    //TODO: make wait time and sleep time configurable if needed.
+    // TODO: If SCMs are bootstrapped later, then listCA need to be
+    //  refetched if listCA size is less than scm ha config node list size.
+    // For now when Client of SCM's are started we compare their node list
+    // size and ca list size if it is as expected, we return the ca list.
+    boolean caListUpToDate;
+    long waitTime = 5 * 60 * 1000L;
+    long retryTime = 10 * 1000L;
+    long currentTime = Time.monotonicNow();
+    List<String> caCertPemList;
+    do {
+      caCertPemList = applyFunction.get();
+      caListUpToDate =
+          caCertPemList.size() == expectedCount ? true : false;
+      if (!caListUpToDate) {
+        LOG.info("Expected CA list size {}, where as received CA List size " +
+            "{}. Retry to fetch CA List after {} seconds", expectedCount,
+            caCertPemList.size(), waitTime/1000);
+        try {
+          Thread.sleep(retryTime);
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!caListUpToDate &&
+        Time.monotonicNow() - currentTime < waitTime);
+    return caCertPemList;
+  }
+
+
+  private static void checkCertCount(int certCount, int expectedCount)
+      throws SCMSecurityException{
+    if (certCount != expectedCount) {
+      LOG.error("Unable to obtain CA list for SCM cluster, obtained CA list " +
+              "size is {}, where as expected list size is {}",
+          certCount, expectedCount);
+      throw new SCMSecurityException("Unable to obtain complete CA list");
+    }
+  }
+
+  /**
+   * Build CA List in the format of X509Certificate.
+   * If certificate client is null, obtain the list of CA using SCM
+   * security client, else it uses certificate client.
+   * @param certClient
+   * @param conf
+   * @return list of CA X509Certificates.
+   * @throws IOException
+   */
+  public static List<X509Certificate> buildCAX509List(
+      CertificateClient certClient,
+      ConfigurationSource conf) throws IOException {
+    if (certClient != null) {
+      // Do this here to avoid extra conversion of X509 to pem and again to
+      // X509 by buildCAList.
+      if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+        List<X509Certificate> x509Certificates = new ArrayList<>();
+        if (certClient.getRootCACertificate() != null) {
+          x509Certificates.add(certClient.getRootCACertificate());
+        }
+        x509Certificates.add(certClient.getCACertificate());
+        return x509Certificates;
+      }
+    }
+    List<String> pemEncodedCerts = HAUtils.buildCAList(certClient, conf);
+    return OzoneSecurityUtil.convertToX509(pemEncodedCerts);
+  }
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index ddc7e04..8539910 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -431,7 +431,7 @@
    * @throws IOException
    */
   public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
-      OzoneConfiguration conf) throws IOException {
+      ConfigurationSource conf) throws IOException {
     return new SCMSecurityProtocolClientSideTranslatorPB(
         new SCMSecurityProtocolFailoverProxyProvider(conf,
             UserGroupInformation.getCurrentUser()));
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
index c60c975..3da7e52 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
@@ -44,7 +44,11 @@
 
   @Override
   public void checkValidCertID(BigInteger serialID) throws IOException {
+  }
 
+  @Override
+  public void storeValidScmCertificate(BigInteger serialID,
+      X509Certificate certificate) throws IOException {
   }
 
   @Override
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 31aac90..e270b27 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -100,6 +100,7 @@
     MISSING_BLOCK_TOKEN = 13;
     BLOCK_TOKEN_VERIFICATION_FAILED = 14;
     GET_ROOT_CA_CERTIFICATE_FAILED = 15;
+    NOT_A_PRIMARY_SCM = 16;
 }
 /**
 * This message is send by data node to prove its identity and get an SCM
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
index 9b917db..e322125 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -20,10 +20,10 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.server.SCMCertStore;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultCAProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -32,6 +32,9 @@
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
@@ -216,7 +219,7 @@
    * @param scmStorageConfig
    */
   public static CertificateServer initializeRootCertificateServer(
-      OzoneConfiguration config, SCMCertStore scmCertStore,
+      OzoneConfiguration config, CertificateStore scmCertStore,
       SCMStorageConfig scmStorageConfig)
       throws IOException {
     String subject = SCM_ROOT_CA_PREFIX +
@@ -281,4 +284,26 @@
     certCodec.writeCertificate(certificateHolder);
   }
 
+  /**
+   * Create Server TLS parameters required for Ratis Server.
+   * @param conf
+   * @param caClient
+   * @return
+   */
+  public static Parameters createSCMServerTlsParameters(SecurityConfig conf,
+      CertificateClient caClient) {
+    Parameters parameters = new Parameters();
+
+    if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      GrpcTlsConfig config = new GrpcTlsConfig(
+          caClient.getPrivateKey(), caClient.getCertificate(),
+          caClient.getCACertificate(), true);
+      GrpcConfigKeys.Server.setTlsConf(parameters, config);
+      GrpcConfigKeys.Admin.setTlsConf(parameters, config);
+      GrpcConfigKeys.Client.setTlsConf(parameters, config);
+      GrpcConfigKeys.TLS.setConf(parameters, config);
+    }
+
+    return parameters;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 4835d03..0702a92 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -344,7 +344,10 @@
     scm.getScmBlockManager().getDeletedBlockLog().reinitialize(
         metadataStore.getDeletedBlocksTXTable());
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      scm.getCertificateServer().reinitialize(metadataStore);
+      if (scm.getRootCertificateServer() != null) {
+        scm.getRootCertificateServer().reinitialize(metadataStore);
+      }
+      scm.getScmCertificateServer().reinitialize(metadataStore);
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index d9d6595..a7c3253 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -36,8 +36,10 @@
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -84,9 +86,14 @@
     // persisted in the raft log post leader election. Now, when the primary
     // scm boots up, it has peer info embedded in the raft log and will
     // trigger leader election.
-    this.server =
-        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
-            .setGroup(RaftGroup.valueOf(groupId)).build();
+
+    Parameters parameters =
+        HASecurityUtils.createSCMServerTlsParameters(new SecurityConfig(conf),
+           scm.getScmCertificateClient());
+    this.server = newRaftServer(scm.getScmId(), conf)
+        .setStateMachine(stateMachine)
+        .setGroup(RaftGroup.valueOf(groupId))
+        .setParameters(parameters).build();
     this.division = server.getDivision(groupId);
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index cc0c776..a6e4143 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -44,6 +44,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type.GetSCMCertificate;
+
 /**
  * This class is the server-side translator that forwards requests received on
  * {@link SCMSecurityProtocolPB} to the {@link
@@ -170,7 +172,8 @@
             .newBuilder()
             .setResponseCode(ResponseCode.success)
             .setX509Certificate(certificate)
-            .setX509CACertificate(impl.getCACertificate());
+            .setX509CACertificate(impl.getCACertificate())
+            .setX509RootCACertificate(impl.getRootCACertificate());
 
     return builder.build();
 
@@ -194,7 +197,8 @@
             .newBuilder()
             .setResponseCode(ResponseCode.success)
             .setX509Certificate(certificate)
-            .setX509CACertificate(impl.getCACertificate());
+            .setX509CACertificate(impl.getRootCACertificate())
+            .setX509RootCACertificate(impl.getRootCACertificate());
 
     return builder.build();
 
@@ -216,7 +220,8 @@
             .newBuilder()
             .setResponseCode(ResponseCode.success)
             .setX509Certificate(certificate)
-            .setX509CACertificate(impl.getCACertificate());
+            .setX509CACertificate(impl.getCACertificate())
+            .setX509RootCACertificate(impl.getRootCACertificate());
     return builder.build();
 
   }
@@ -243,7 +248,9 @@
         SCMGetCertResponseProto
             .newBuilder()
             .setResponseCode(ResponseCode.success)
-            .setX509Certificate(certificate);
+            .setX509Certificate(certificate)
+            .setX509CACertificate(certificate)
+            .setX509RootCACertificate(impl.getRootCACertificate());
     return builder.build();
 
   }
@@ -271,6 +278,7 @@
         SCMGetCertResponseProto
             .newBuilder()
             .setResponseCode(ResponseCode.success)
+            .setX509Certificate(rootCACertificate)
             .setX509RootCACertificate(rootCACertificate);
     return builder.build();
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index ee4bc20..e76d200 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -101,7 +101,7 @@
    * @param certificate - Certificate to persist.
    * @throws IOException - on Failure.
    */
-  private void storeValidScmCertificate(BigInteger serialID,
+  public void storeValidScmCertificate(BigInteger serialID,
       X509Certificate certificate) throws IOException {
     lock.lock();
     try {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 5a1ecf3..e3fd23e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -57,7 +57,7 @@
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
-import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_ROOT_CA_CERT_FAILED;
+import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.NOT_A_PRIMARY_SCM;
 import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
 
 /**
@@ -70,17 +70,23 @@
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(SCMSecurityProtocolServer.class);
-  private final CertificateServer certificateServer;
+  private final CertificateServer rootCertificateServer;
+  private final CertificateServer scmCertificateServer;
+  private final X509Certificate rootCACertificate;
   private final RPC.Server rpcServer;
   private final InetSocketAddress rpcAddress;
   private final ProtocolMessageMetrics metrics;
   private final StorageContainerManager storageContainerManager;
 
   SCMSecurityProtocolServer(OzoneConfiguration conf,
-      CertificateServer certificateServer, StorageContainerManager scm)
+      CertificateServer rootCertificateServer,
+      CertificateServer scmCertificateServer,
+      X509Certificate rootCACert, StorageContainerManager scm)
       throws IOException {
     this.storageContainerManager = scm;
-    this.certificateServer = certificateServer;
+    this.rootCertificateServer = rootCertificateServer;
+    this.scmCertificateServer = scmCertificateServer;
+    this.rootCACertificate = rootCACert;
     final int handlerCount =
         conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
             ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT);
@@ -155,19 +161,28 @@
   public String getSCMCertificate(ScmNodeDetailsProto scmNodeDetails,
       String certSignReq) throws IOException {
     Objects.requireNonNull(scmNodeDetails);
-    LOGGER.info("Processing CSR for scm {}, nodeId: {}",
-        scmNodeDetails.getHostName(), scmNodeDetails.getScmNodeId());
+    String primaryScmId =
+        storageContainerManager.getScmStorageConfig().getPrimaryScmNodeId();
 
-    // Check clusterID
-    if (storageContainerManager.getClusterId().equals(
-        scmNodeDetails.getClusterId())) {
-      throw new IOException("SCM ClusterId mismatch. Peer SCM ClusterId " +
-          scmNodeDetails.getClusterId() + ", primary SCM ClusterId "
-          + storageContainerManager.getClusterId());
+    if (primaryScmId != null &&
+        primaryScmId.equals(storageContainerManager.getScmId())) {
+      LOGGER.info("Processing CSR for scm {}, nodeId: {}",
+          scmNodeDetails.getHostName(), scmNodeDetails.getScmNodeId());
+
+      // Check clusterID
+      if (!storageContainerManager.getClusterId().equals(
+          scmNodeDetails.getClusterId())) {
+        throw new IOException("SCM ClusterId mismatch. Peer SCM ClusterId " +
+            scmNodeDetails.getClusterId() + ", primary SCM ClusterId "
+            + storageContainerManager.getClusterId());
+      }
+
+      return getEncodedCertToString(certSignReq, NodeType.SCM);
+    } else {
+      throw new SCMSecurityException("Get SCM Certificate can be run only " +
+          "primary SCM", NOT_A_PRIMARY_SCM);
     }
 
-    return getEncodedCertToString(certSignReq, NodeType.SCM);
-
   }
 
   /**
@@ -179,9 +194,15 @@
    */
   private String getEncodedCertToString(String certSignReq, NodeType nodeType)
       throws IOException {
-    Future<X509CertificateHolder> future =
-        certificateServer.requestCertificate(certSignReq,
-            KERBEROS_TRUSTED, nodeType);
+
+    Future<X509CertificateHolder> future;
+    if (nodeType == NodeType.SCM) {
+      future = rootCertificateServer.requestCertificate(certSignReq,
+              KERBEROS_TRUSTED, nodeType);
+    } else {
+      future = scmCertificateServer.requestCertificate(certSignReq,
+          KERBEROS_TRUSTED, nodeType);
+    }
     try {
       return CertificateCodec.getPEMEncodedString(future.get());
     } catch (InterruptedException e) {
@@ -226,7 +247,7 @@
         certSerialId);
     try {
       X509Certificate certificate =
-          certificateServer.getCertificate(certSerialId);
+          scmCertificateServer.getCertificate(certSerialId);
       if (certificate != null) {
         return CertificateCodec.getPEMEncodedString(certificate);
       }
@@ -234,7 +255,7 @@
       throw new SCMSecurityException("getCertificate operation failed. ", e,
           GET_CERTIFICATE_FAILED);
     }
-    LOGGER.debug("Certificate with serial id {} not found.", certSerialId);
+    LOGGER.info("Certificate with serial id {} not found.", certSerialId);
     throw new SCMSecurityException("Certificate not found",
         CERTIFICATE_NOT_FOUND);
   }
@@ -249,7 +270,7 @@
     LOGGER.debug("Getting CA certificate.");
     try {
       return CertificateCodec.getPEMEncodedString(
-          certificateServer.getCACertificate());
+          scmCertificateServer.getCACertificate());
     } catch (CertificateException e) {
       throw new SCMSecurityException("getRootCertificate operation failed. ",
           e, GET_CA_CERT_FAILED);
@@ -269,7 +290,7 @@
   public List<String> listCertificate(NodeType role,
       long startSerialId, int count, boolean isRevoked) throws IOException {
     List<X509Certificate> certificates =
-        certificateServer.listCertificate(role, startSerialId, count,
+        scmCertificateServer.listCertificate(role, startSerialId, count,
             isRevoked);
     List<String> results = new ArrayList<>(certificates.size());
     for (X509Certificate cert : certificates) {
@@ -288,22 +309,17 @@
   public List<String> listCACertificate() throws IOException {
     List<String> caCerts =
         listCertificate(NodeType.SCM, 0, 10, false);
-    caCerts.add(getRootCACertificate());
     return caCerts;
   }
 
   @Override
   public String getRootCACertificate() throws IOException {
     LOGGER.debug("Getting Root CA certificate.");
-    //TODO: This code will be modified after HDDS-4897 is merged and
-    // integrated. For now getting RootCA cert from certificateServer.
-    try {
-      return CertificateCodec.getPEMEncodedString(
-          certificateServer.getCACertificate());
-    } catch (CertificateException e) {
-      throw new SCMSecurityException("getRootCertificate operation failed. ",
-          e, GET_ROOT_CA_CERT_FAILED);
+    if (storageContainerManager.getScmStorageConfig()
+        .getPrimaryScmNodeId() != null) {
+      return CertificateCodec.getPEMEncodedString(rootCACertificate);
     }
+    return null;
   }
 
   public RPC.Server getRpcServer() {
@@ -337,4 +353,12 @@
     getRpcServer().join();
   }
 
+  public CertificateServer getRootCertificateServer() {
+    return rootCertificateServer;
+  }
+
+
+  public CertificateServer getScmCertificateServer() {
+    return scmCertificateServer;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStarterInterface.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStarterInterface.java
index 1ae29b1..f037be6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStarterInterface.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStarterInterface.java
@@ -22,6 +22,8 @@
 package org.apache.hadoop.hdds.scm.server;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+
 import java.io.IOException;
 
 /**
@@ -34,6 +36,6 @@
   boolean init(OzoneConfiguration conf, String clusterId)
       throws IOException;
   boolean bootStrap(OzoneConfiguration conf)
-      throws IOException;
+      throws IOException, AuthenticationException;
   String generateClusterId();
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
index 5d9f4de..4a346dd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
@@ -27,6 +27,7 @@
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.OzoneConsts.PRIMARY_SCM_NODE_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_CERT_SERIAL_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
@@ -94,4 +95,26 @@
   public String getScmCertSerialId() {
     return getStorageInfo().getProperty(SCM_CERT_SERIAL_ID);
   }
+
+  /**
+   * Set primary SCM node ID.
+   * @param scmId
+   * @throws IOException
+   */
+  public void setPrimaryScmNodeId(String scmId) throws IOException {
+    getStorageInfo().setProperty(PRIMARY_SCM_NODE_ID, scmId);
+
+  }
+
+  /**
+   * Retrieves the primary SCM node ID from the version file.
+   * @return Primary SCM node ID.
+   */
+  public String getPrimaryScmNodeId() {
+    return getStorageInfo().getProperty(PRIMARY_SCM_NODE_ID);
+  }
+
+  public boolean checkPrimarySCMIdInitialized() {
+    return getPrimaryScmNodeId() != null ? true : false;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 22d41fd..bfd45d8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -23,6 +23,7 @@
 
 import javax.management.ObjectName;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import com.google.common.annotations.VisibleForTesting;
@@ -36,6 +37,7 @@
 import java.security.cert.X509Certificate;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -54,10 +56,12 @@
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
 import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
@@ -66,6 +70,8 @@
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -108,11 +114,9 @@
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
 import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -122,6 +126,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
@@ -132,16 +137,15 @@
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.JvmPauseMonitor;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.hdds.utils.HAUtils.checkSecurityAndSCMHAEnabled;
+import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
 
 /**
  * StorageContainerManager is the main entry point for the service that
@@ -186,6 +190,7 @@
   private NodeDecommissionManager scmDecommissionManager;
 
   private SCMMetadataStore scmMetadataStore;
+  private CertificateStore certificateStore;
   private SCMHAManager scmHAManager;
   private SCMContext scmContext;
   private SequenceIdGenerator sequenceIdGen;
@@ -215,14 +220,14 @@
   private final LeaseManager<Long> commandWatcherLeaseManager;
 
   private SCMSafeModeManager scmSafeModeManager;
-  private CertificateServer certificateServer;
-  private GrpcTlsConfig grpcTlsConfig;
+  private SCMCertificateClient scmCertificateClient;
 
   private JvmPauseMonitor jvmPauseMonitor;
   private final OzoneConfiguration configuration;
   private SCMContainerMetrics scmContainerMetrics;
   private SCMContainerPlacementMetrics placementMetrics;
   private MetricsSystem ms;
+  private String primaryScmNodeId;
 
   /**
    *  Network topology Map.
@@ -287,6 +292,9 @@
           "failure.", ResultCodes.SCM_NOT_INITIALIZED);
     }
 
+    primaryScmNodeId = scmStorageConfig.getPrimaryScmNodeId();
+    initializeCertificateClient();
+
     /**
      * Important : This initialization sequence is assumed by some of our tests.
      * The testSecureOzoneCluster assumes that security checks have to be
@@ -294,7 +302,7 @@
      * add any other initialization above the Security checks please.
      */
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      loginAsSCMUser(conf);
+      loginAsSCMUserIfSecurityEnabled(scmHANodeDetails, conf);
     }
 
     // Creates the SCM DBs or opens them if it exists.
@@ -319,7 +327,6 @@
       // if no Security, we do not create a Certificate Server at all.
       // This allows user to boot SCM without security temporarily
       // and then come back and enable it without any impact.
-      certificateServer = null;
       securityProtocolServer = null;
     }
 
@@ -400,6 +407,14 @@
     registerMetricsSource(this);
   }
 
+  private void initializeCertificateClient() {
+    if (scmStorageConfig.checkPrimarySCMIdInitialized()) {
+      scmCertificateClient = new SCMCertificateClient(
+          new SecurityConfig(configuration),
+          scmStorageConfig.getScmCertSerialId());
+    }
+  }
+
   public OzoneConfiguration getConfiguration() {
     return configuration;
   }
@@ -558,44 +573,114 @@
    */
   private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
       SCMConfigurator configurator) throws IOException {
-    if(configurator.getCertificateServer() != null) {
-      this.certificateServer = configurator.getCertificateServer();
-    } else {
-      // This assumes that SCM init has run, and DB metadata stores are created.
-      certificateServer = initializeCertificateServer(
-          getScmStorageConfig().getClusterID(),
-          getScmStorageConfig().getScmId());
+
+
+    // TODO: Support Certificate Server loading via Class Name loader.
+    // So it is easy to use different Certificate Servers if needed.
+    if(this.scmMetadataStore == null) {
+      LOG.error("Cannot initialize Certificate Server without a valid meta " +
+          "data layer.");
+      throw new SCMException("Cannot initialize CA without a valid metadata " +
+          "store", ResultCodes.SCM_NOT_INITIALIZED);
     }
-    // TODO: Support Intermediary CAs in future.
-    certificateServer.init(new SecurityConfig(conf),
-        CertificateServer.CAType.SELF_SIGNED_CA);
-    securityProtocolServer = new SCMSecurityProtocolServer(conf,
-        certificateServer, this);
 
-    grpcTlsConfig = createTlsClientConfigForSCM(new SecurityConfig(conf),
-            certificateServer);
-  }
+    certificateStore =
+        new SCMCertStore.Builder().setMetadaStore(scmMetadataStore)
+            .setRatisServer(scmHAManager.getRatisServer())
+            .setCRLSequenceId(getLastSequenceIdForCRL()).build();
 
-  public CertificateServer getCertificateServer() {
-    return certificateServer;
-  }
 
-  // For Internal gRPC client from SCM to DN with gRPC TLS
-  static GrpcTlsConfig createTlsClientConfigForSCM(SecurityConfig conf,
-      CertificateServer certificateServer) throws IOException {
-    if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
-      try {
-        X509Certificate caCert =
-            CertificateCodec.getX509Certificate(
-                certificateServer.getCACertificate());
-        return new GrpcTlsConfig(null, null,
-            caCert, false);
-      } catch (CertificateException ex) {
-        throw new SCMSecurityException("Fail to find SCM CA certificate.", ex);
+    final CertificateServer scmCertificateServer;
+    final CertificateServer rootCertificateServer;
+    // If primary SCM node Id is set it means this is a cluster which has
+    // performed init with SCM HA version code.
+    if (scmStorageConfig.checkPrimarySCMIdInitialized()) {
+      // Start specific instance SCM CA server.
+      String subject = SCM_SUB_CA_PREFIX +
+          InetAddress.getLocalHost().getHostName();
+      if (configurator.getCertificateServer() != null) {
+        scmCertificateServer = configurator.getCertificateServer();
+      } else {
+        scmCertificateServer = new DefaultCAServer(subject,
+            scmStorageConfig.getClusterID(), scmStorageConfig.getScmId(),
+            certificateStore, new DefaultProfile(),
+            scmCertificateClient.getComponentName());
+        // INTERMEDIARY_CA which issues certs to DN and OM.
+        scmCertificateServer.init(new SecurityConfig(configuration),
+            CertificateServer.CAType.INTERMEDIARY_CA);
       }
+
+      if (primaryScmNodeId.equals(scmStorageConfig.getScmId())) {
+        if (configurator.getCertificateServer() != null) {
+          rootCertificateServer = configurator.getCertificateServer();
+        } else {
+          rootCertificateServer =
+              HASecurityUtils.initializeRootCertificateServer(
+              conf, certificateStore, scmStorageConfig);
+        }
+        persistPrimarySCMCerts();
+      } else {
+        rootCertificateServer = null;
+      }
+    } else {
+      // On a upgraded cluster primary scm nodeId will not be set as init will
+      // not be run again after upgrade. So for a upgraded cluster where init
+      // has not happened again we will have setup like before where it has
+      // one CA server which is issuing certificates to DN and OM.
+      rootCertificateServer =
+          HASecurityUtils.initializeRootCertificateServer(conf,
+              certificateStore, scmStorageConfig);
+      scmCertificateServer = rootCertificateServer;
     }
-    return null;
+
+    // We need to pass getCACertificate as rootCA certificate,
+    // as for SCM CA is root-CA.
+    securityProtocolServer = new SCMSecurityProtocolServer(conf,
+        rootCertificateServer, scmCertificateServer,
+        scmCertificateClient.getCACertificate(), this);
   }
+
+  /** Persist primary SCM root ca cert and sub-ca certs to DB.
+   *
+   * @throws IOException
+   */
+  private void persistPrimarySCMCerts() throws IOException {
+    BigInteger certSerial =
+        scmCertificateClient.getCertificate().getSerialNumber();
+    // Store the certificate in DB. On primary SCM when init happens, the
+    // certificate is not persisted to DB. As we don't have Metadatstore
+    // and ratis server initialized with statemachine. We need to do only
+    // for primary scm, for other bootstrapped scm's certificates will be
+    // persisted via ratis.
+    if (certificateStore.getCertificateByID(certSerial,
+        VALID_CERTS) == null) {
+      LOG.info("Storing sub-ca certificate serialId {} on primary SCM",
+          certSerial);
+      certificateStore.storeValidScmCertificate(
+          certSerial, scmCertificateClient.getCertificate());
+    }
+    X509Certificate rootCACert = scmCertificateClient.getCACertificate();
+    if (certificateStore.getCertificateByID(rootCACert.getSerialNumber(),
+        VALID_CERTS) == null) {
+      LOG.info("Storing root certificate serialId {}",
+          rootCACert.getSerialNumber());
+      certificateStore.storeValidScmCertificate(
+          rootCACert.getSerialNumber(), rootCACert);
+    }
+  }
+
+  public CertificateServer getRootCertificateServer() {
+    return getSecurityProtocolServer().getRootCertificateServer();
+  }
+
+  public CertificateServer getScmCertificateServer() {
+    return getSecurityProtocolServer().getScmCertificateServer();
+  }
+
+  public SCMCertificateClient getScmCertificateClient() {
+    return scmCertificateClient;
+  }
+
   /**
    * Init the metadata store based on the configurator.
    * @param conf - Config
@@ -617,63 +702,35 @@
    *
    * @param conf
    */
-  private void loginAsSCMUser(ConfigurationSource conf)
+  private static void loginAsSCMUserIfSecurityEnabled(
+      SCMHANodeDetails scmhaNodeDetails, ConfigurationSource conf)
       throws IOException, AuthenticationException {
-    if (LOG.isDebugEnabled()) {
-      ScmConfig scmConfig = configuration.getObject(ScmConfig.class);
-      LOG.debug("Ozone security is enabled. Attempting login for SCM user. "
-              + "Principal: {}, keytab: {}",
-          scmConfig.getKerberosPrincipal(),
-          scmConfig.getKerberosKeytab());
-    }
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      if (LOG.isDebugEnabled()) {
+        ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+        LOG.debug("Ozone security is enabled. Attempting login for SCM user. "
+                + "Principal: {}, keytab: {}",
+            scmConfig.getKerberosPrincipal(),
+            scmConfig.getKerberosKeytab());
+      }
 
-    Configuration hadoopConf =
-        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
-    if (SecurityUtil.getAuthenticationMethod(hadoopConf).equals(
-        AuthenticationMethod.KERBEROS)) {
-      UserGroupInformation.setConfiguration(hadoopConf);
-      InetSocketAddress socAddr = HddsServerUtil
-          .getScmBlockClientBindAddress(conf);
-      SecurityUtil.login(hadoopConf,
+      Configuration hadoopConf =
+          LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+      if (SecurityUtil.getAuthenticationMethod(hadoopConf).equals(
+          AuthenticationMethod.KERBEROS)) {
+        UserGroupInformation.setConfiguration(hadoopConf);
+        InetSocketAddress socketAddress = getScmAddress(scmhaNodeDetails, conf);
+        SecurityUtil.login(hadoopConf,
             ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
             ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
-            socAddr.getHostName());
-    } else {
-      throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
-          hadoopConf) + " authentication method not support. "
-          + "SCM user login failed.");
+            socketAddress.getHostName());
+      } else {
+        throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
+            hadoopConf) + " authentication method not support. "
+            + "SCM user login failed.");
+      }
+      LOG.info("SCM login successful.");
     }
-    LOG.info("SCM login successful.");
-  }
-
-  /**
-   * This function creates/initializes a certificate server as needed.
-   * This function is idempotent, so calling this again and again after the
-   * server is initialized is not a problem.
-   *
-   * @param clusterID - Cluster ID
-   * @param scmID     - SCM ID
-   */
-  private CertificateServer initializeCertificateServer(String clusterID,
-      String scmID) throws IOException {
-    // TODO: Support Certificate Server loading via Class Name loader.
-    // So it is easy to use different Certificate Servers if needed.
-    String subject = SCM_ROOT_CA_PREFIX +
-        InetAddress.getLocalHost().getHostName();
-    if(this.scmMetadataStore == null) {
-      LOG.error("Cannot initialize Certificate Server without a valid meta " +
-          "data layer.");
-      throw new SCMException("Cannot initialize CA without a valid metadata " +
-          "store", ResultCodes.SCM_NOT_INITIALIZED);
-    }
-
-    CertificateStore certStore =
-        new SCMCertStore.Builder().setMetadaStore(scmMetadataStore)
-            .setRatisServer(scmHAManager.getRatisServer())
-            .setCRLSequenceId(getLastSequenceIdForCRL()).build();
-
-    return new DefaultCAServer(subject, clusterID, scmID, certStore,
-        new DefaultProfile(), SCM_ROOT_CA_COMPONENT_NAME);
   }
 
   long getLastSequenceIdForCRL() throws IOException {
@@ -749,14 +806,17 @@
    * @throws IOException if init fails due to I/O error
    */
   public static boolean scmBootstrap(OzoneConfiguration conf)
-      throws IOException {
+      throws AuthenticationException, IOException {
     if (!SCMHAUtils.isSCMHAEnabled(conf)) {
       LOG.error("Bootstrap is not supported without SCM HA.");
       return false;
     }
+    SCMHANodeDetails scmhaNodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+
+    loginAsSCMUserIfSecurityEnabled(scmhaNodeDetails, conf);
     // The node here will try to fetch the cluster id from any of existing
     // running SCM instances.
-    SCMHANodeDetails scmhaNodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+
     String primordialSCM = SCMHAUtils.getPrimordialSCM(conf);
     String selfNodeId = scmhaNodeDetails.getLocalNodeDetails().getNodeId();
     if (primordialSCM != null && SCMHAUtils.isPrimordialSCM(conf, selfNodeId)) {
@@ -793,7 +853,18 @@
         // SCM Node info containing hostname to scm Id mappings
         // will be persisted into the version file once this node gets added
         // to existing SCM ring post node regular start up.
+
+        if(OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          HASecurityUtils.initializeSecurity(scmStorageConfig,
+              scmInfo.getScmId(), config, getScmAddress(scmhaNodeDetails, conf),
+              false);
+        }
+        scmStorageConfig.setPrimaryScmNodeId(scmInfo.getScmId());
         scmStorageConfig.initialize();
+        LOG.info("SCM BootStrap  is successful for ClusterID {}, SCMID {}",
+            scmInfo.getClusterId(), scmStorageConfig.getScmId());
+        LOG.info("Primary SCM Node ID {}",
+            scmStorageConfig.getPrimaryScmNodeId());
       } catch (IOException ioe) {
         LOG.error("Could not initialize SCM version file", ioe);
         return false;
@@ -832,12 +903,20 @@
           Preconditions.checkNotNull(UUID.fromString(clusterId));
           scmStorageConfig.setClusterId(clusterId);
         }
-        scmStorageConfig.initialize();
+
         if (SCMHAUtils.isSCMHAEnabled(conf)) {
           SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(),
               scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(),
               conf);
         }
+
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          HASecurityUtils.initializeSecurity(scmStorageConfig,
+              scmStorageConfig.getScmId(), conf, getScmAddress(haDetails,
+                  conf), true);
+        }
+        scmStorageConfig.setPrimaryScmNodeId(scmStorageConfig.getScmId());
+        scmStorageConfig.initialize();
         LOG.info("SCM initialization succeeded. Current cluster id for sd={}"
                 + "; cid={}; layoutVersion={}; scmId={}",
             scmStorageConfig.getStorageDir(), scmStorageConfig.getClusterID(),
@@ -860,6 +939,40 @@
     }
   }
 
+  private static InetSocketAddress getScmAddress(SCMHANodeDetails haDetails,
+      ConfigurationSource conf) throws IOException {
+    List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(
+        conf);
+    Preconditions.checkNotNull(scmNodeInfoList, "scmNodeInfoList is null");
+
+    InetSocketAddress scmAddress = null;
+    if (SCMHAUtils.getScmServiceId(conf) != null) {
+      for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+        if (haDetails.getLocalNodeDetails().getNodeId() != null
+            && haDetails.getLocalNodeDetails().getNodeId().equals(
+            scmNodeInfo.getNodeId())) {
+          scmAddress =
+              NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress());
+        }
+      }
+    } else  {
+      // Get Local host and use scm client port
+      if (scmNodeInfoList.get(0).getBlockClientAddress() == null) {
+        LOG.error("SCM Address not able to figure out from config, finding " +
+            "hostname from InetAddress.");
+        scmAddress =
+            NetUtils.createSocketAddr(InetAddress.getLocalHost().getHostName(),
+                ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT);
+      } else {
+        scmAddress = NetUtils.createSocketAddr(
+            scmNodeInfoList.get(0).getBlockClientAddress());
+      }
+    }
+
+
+    return scmAddress;
+  }
+
   /**
    * Initialize SCM metrics.
    */
@@ -1019,6 +1132,7 @@
     getDatanodeProtocolServer().start();
     if (getSecurityProtocolServer() != null) {
       getSecurityProtocolServer().start();
+      persistSCMCertificates();
     }
 
     scmBlockManager.start();
@@ -1039,6 +1153,40 @@
     setStartTime();
   }
 
+  /** Persist SCM certs to DB on bootstrap scm nodes.
+   *
+   * @throws IOException
+   */
+  private void persistSCMCertificates() throws IOException {
+    // Fetch all CA's and persist during startup on bootstrap nodes. This
+    // is primarily being done to persist primary SCM Cert and Root CA.
+    // TODO: see if we can avoid doing this during every restart.
+    if (primaryScmNodeId != null && !primaryScmNodeId.equals(
+        scmStorageConfig.getScmId())) {
+      List<String> pemEncodedCerts =
+          scmCertificateClient.listCA();
+
+      // Write the primary SCM CA and Root CA during startup.
+      for (String cert : pemEncodedCerts) {
+        try {
+          X509Certificate x509Certificate =
+              CertificateCodec.getX509Certificate(cert);
+          if (certificateStore.getCertificateByID(
+              x509Certificate.getSerialNumber(), VALID_CERTS) == null) {
+            LOG.info("Persist certificate serialId {} on Scm Bootstrap Node " +
+                    "{}", x509Certificate.getSerialNumber(),
+                scmStorageConfig.getScmId());
+            certificateStore.storeValidScmCertificate(
+                x509Certificate.getSerialNumber(), x509Certificate);
+          }
+        } catch (CertificateException ex) {
+          LOG.error("Error while decoding CA Certificate", ex);
+          throw new IOException(ex);
+        }
+      }
+    }
+  }
+
   /**
    * Stop service.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerStarter.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerStarter.java
index 4c7b693..8960269 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerStarter.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerStarter.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import picocli.CommandLine;
@@ -174,7 +175,7 @@
 
     @Override
     public boolean bootStrap(OzoneConfiguration conf)
-        throws IOException{
+        throws AuthenticationException, IOException {
       return StorageContainerManager.scmBootstrap(conf);
     }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
index 1024756..86a8c3e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
@@ -41,7 +41,8 @@
     config = new OzoneConfiguration();
     config.set(OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY,
         OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT + ":0");
-    securityProtocolServer = new SCMSecurityProtocolServer(config, null, null);
+    securityProtocolServer = new SCMSecurityProtocolServer(config, null,
+        null, null, null);
   }
 
   @After
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index aab0524..b47c45d 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.cli;
 
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.Map;
 
@@ -26,7 +27,6 @@
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -39,11 +39,8 @@
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 
 import org.slf4j.Logger;
@@ -53,7 +50,6 @@
 import com.google.common.annotations.VisibleForTesting;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
 import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 
 /**
@@ -102,14 +98,11 @@
       throws IOException {
     XceiverClientManager manager;
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      SecurityConfig securityConfig = new SecurityConfig(conf);
-      SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
-          (OzoneConfiguration) securityConfig.getConfiguration());
-      String caCertificate =
-          scmSecurityProtocolClient.getCACertificate();
+      List<X509Certificate> caCertificates =
+          HAUtils.buildCAX509List(null, conf);
       manager = new XceiverClientManager(conf,
           conf.getObject(XceiverClientManager.ScmClientConfig.class),
-          caCertificate);
+          caCertificates);
     } else {
       manager = new XceiverClientManager(conf);
     }
@@ -118,14 +111,7 @@
 
   public static StorageContainerLocationProtocol newContainerRpcClient(
       ConfigurationSource configSource) {
-    SCMContainerLocationFailoverProxyProvider proxyProvider =
-        new SCMContainerLocationFailoverProxyProvider(configSource);
-
-    StorageContainerLocationProtocolClientSideTranslatorPB client =
-        new StorageContainerLocationProtocolClientSideTranslatorPB(
-            proxyProvider);
-    return TracingUtil.createProxy(
-        client, StorageContainerLocationProtocol.class, configSource);
+    return HAUtils.getScmContainerClient(configSource);
   }
 
   @Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ed85a32..c5d591c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -25,8 +25,10 @@
 import java.net.URI;
 import java.security.InvalidKeyException;
 import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -172,13 +174,21 @@
     );
     dtService = omTransport.getDelegationTokenService();
     ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
-    String caCertPem = null;
+    List<X509Certificate> x509Certificates = null;
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      String caCertPem = null;
+      List<String> caCertPems = null;
       caCertPem = serviceInfoEx.getCaCertificate();
+      caCertPems = serviceInfoEx.getCaCertPemList();
+      if (caCertPems == null || caCertPems.isEmpty()) {
+        caCertPems = Collections.singletonList(caCertPem);
+      }
+      x509Certificates = OzoneSecurityUtil.convertToX509(caCertPems);
     }
 
     this.xceiverClientManager = new XceiverClientManager(conf,
-        conf.getObject(XceiverClientManager.ScmClientConfig.class), caCertPem);
+        conf.getObject(XceiverClientManager.ScmClientConfig.class),
+        x509Certificates);
 
     int configuredChunkSize = (int) conf
         .getStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
index a90be63..e7968b8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
@@ -30,11 +30,13 @@
 
   // PEM encoded string of SCM CA certificate.
   private String caCertificate;
+  private List<String> caCertPemList;
 
   public ServiceInfoEx(List<ServiceInfo> infoList,
-      String caCertificate) {
+      String caCertificate, List<String> caCertPemList) {
     this.infoList = infoList;
     this.caCertificate = caCertificate;
+    this.caCertPemList = caCertPemList;
   }
 
   public List<ServiceInfo> getServiceInfoList() {
@@ -44,4 +46,8 @@
   public String getCaCertificate() {
     return caCertificate;
   }
+
+  public List<String> getCaCertPemList() {
+    return caCertPemList;
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index a2b2f48..43d72b9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -1078,7 +1078,7 @@
         resp.getServiceInfoList().stream()
             .map(ServiceInfo::getFromProtobuf)
             .collect(Collectors.toList()),
-        resp.getCaCertificate());
+        resp.getCaCertificate(), resp.getCaCertsList());
   }
 
   /**
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-compose.yaml
index 48adc78..4489a55 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-compose.yaml
@@ -41,6 +41,7 @@
   om1:
     <<: *common-config
     environment:
+      WAITFOR: scm3:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       <<: *replication
     ports:
@@ -51,6 +52,7 @@
   om2:
     <<: *common-config
     environment:
+      WAITFOR: scm3:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       <<: *replication
     ports:
@@ -61,6 +63,7 @@
   om3:
     <<: *common-config
     environment:
+      WAITFOR: scm3:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       <<: *replication
     ports:
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/.env b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/.env
similarity index 100%
rename from hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/.env
rename to hadoop-ozone/dist/src/main/compose/ozonesecure-ha/.env
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-compose.yaml
similarity index 69%
rename from hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-compose.yaml
rename to hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-compose.yaml
index 4229461..c3e816b 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-compose.yaml
@@ -48,9 +48,14 @@
     ports:
       - 9864:9999
     command: ["/opt/hadoop/bin/ozone","datanode"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     env_file:
       - docker-config
     environment:
+      WAITFOR: scm3.org:9865
       KERBEROS_KEYTABS: dn HTTP
       OZONE_OPTS:
     networks:
@@ -63,9 +68,14 @@
     ports:
       - 9866:9999
     command: ["/opt/hadoop/bin/ozone","datanode"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     env_file:
       - docker-config
     environment:
+      WAITFOR: scm3.org:9865
       KERBEROS_KEYTABS: dn HTTP
       OZONE_OPTS:
     networks:
@@ -78,9 +88,14 @@
     ports:
       - 9868:9999
     command: ["/opt/hadoop/bin/ozone","datanode"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     env_file:
       - docker-config
     environment:
+      WAITFOR: scm3.org:9865
       KERBEROS_KEYTABS: dn HTTP
       OZONE_OPTS:
     networks:
@@ -96,12 +111,17 @@
       - 9890:9872
       #- 18001:18001
     environment:
+      WAITFOR: scm3.org:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       KERBEROS_KEYTABS: om HTTP
       OZONE_OPTS:
     env_file:
       - ./docker-config
     command: ["/opt/hadoop/bin/ozone","om"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     networks:
       ozone_net:
         ipv4_address: 172.25.0.111
@@ -115,12 +135,17 @@
       - 9892:9872
       #- 18002:18002
     environment:
+      WAITFOR: scm3.org:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       KERBEROS_KEYTABS: om HTTP
       OZONE_OPTS:
     env_file:
       - ./docker-config
     command: ["/opt/hadoop/bin/ozone","om"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     networks:
       ozone_net:
         ipv4_address: 172.25.0.112
@@ -134,12 +159,17 @@
       - 9894:9872
       #- 18003:18003
     environment:
+      WAITFOR: scm3.org:9865
       ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       KERBEROS_KEYTABS: om HTTP
       OZONE_OPTS:
     env_file:
       - ./docker-config
     command: ["/opt/hadoop/bin/ozone","om"]
+    extra_hosts:
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     networks:
       ozone_net:
         ipv4_address: 172.25.0.113
@@ -159,35 +189,14 @@
     networks:
       ozone_net:
         ipv4_address: 172.25.0.114
-  recon:
+  scm1.org:
     image: apache/ozone-runner:${OZONE_RUNNER_VERSION}
-    hostname: recon
+    hostname: scm1.org
     volumes:
       - ../..:/opt/hadoop
     ports:
-      - 9888:9888
-      #- 18000:18000
-    env_file:
-      - ./docker-config
-    environment:
-      KERBEROS_KEYTABS: recon HTTP
-      OZONE_OPTS:
-    command: ["/opt/hadoop/bin/ozone","recon"]
-    extra_hosts:
-      - "om1: 172.25.0.111"
-      - "om2: 172.25.0.112"
-      - "om3: 172.25.0.113"
-    networks:
-      ozone_net:
-        ipv4_address: 172.25.0.115
-  scm:
-    image: apache/ozone-runner:${OZONE_RUNNER_VERSION}
-    hostname: scm
-    volumes:
-      - ../..:/opt/hadoop
-    ports:
-      - 9876:9876
-      - 9860:9860
+      - 9990:9876
+      - 9992:9860
     env_file:
       - docker-config
     environment:
@@ -200,9 +209,66 @@
       - "om1: 172.25.0.111"
       - "om2: 172.25.0.112"
       - "om3: 172.25.0.113"
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
     networks:
       ozone_net:
         ipv4_address: 172.25.0.116
+  scm2.org:
+    image: apache/ozone-runner:${OZONE_RUNNER_VERSION}
+    hostname: scm2.org
+    volumes:
+      - ../..:/opt/hadoop
+    ports:
+      - 9994:9876
+      - 9996:9860
+    env_file:
+      - docker-config
+    environment:
+      WAITFOR: scm1.org:9865
+      KERBEROS_KEYTABS: scm HTTP testuser testuser2
+      ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION
+      OZONE-SITE.XML_hdds.scm.safemode.min.datanode: "${OZONE_SAFEMODE_MIN_DATANODES:-3}"
+      OZONE_OPTS:
+    command: ["/opt/hadoop/bin/ozone","scm"]
+    extra_hosts:
+      - "om1: 172.25.0.111"
+      - "om2: 172.25.0.112"
+      - "om3: 172.25.0.113"
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
+    networks:
+      ozone_net:
+        ipv4_address: 172.25.0.117
+  scm3.org:
+    image: apache/ozone-runner:${OZONE_RUNNER_VERSION}
+    hostname: scm3.org
+    volumes:
+      - ../..:/opt/hadoop
+    ports:
+      - 9998:9876
+      - 10002:9860
+    env_file:
+      - docker-config
+    environment:
+      WAITFOR: scm2.org:9865
+      KERBEROS_KEYTABS: scm HTTP testuser testuser2
+      ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION
+      OZONE-SITE.XML_hdds.scm.safemode.min.datanode: "${OZONE_SAFEMODE_MIN_DATANODES:-3}"
+      OZONE_OPTS:
+    command: ["/opt/hadoop/bin/ozone","scm"]
+    extra_hosts:
+      - "om1: 172.25.0.111"
+      - "om2: 172.25.0.112"
+      - "om3: 172.25.0.113"
+      - "scm1.org: 172.25.0.116"
+      - "scm2.org: 172.25.0.117"
+      - "scm3.org: 172.25.0.118"
+    networks:
+      ozone_net:
+        ipv4_address: 172.25.0.118
 networks:
   ozone_net:
     ipam:
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
similarity index 92%
rename from hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-config
rename to hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index eeb05f9..c67e579 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -27,12 +27,19 @@
 OZONE-SITE.XML_ozone.om.http-address.id1.om3=om3
 OZONE-SITE.XML_ozone.om.ratis.enable=true
 
+OZONE-SITE.XML_ozone.scm.service.ids=scmservice
+OZONE-SITE.XML_ozone.scm.nodes.scmservice=scm1,scm2,scm3
+OZONE-SITE.XML_ozone.scm.address.scmservice.scm1=scm1.org
+OZONE-SITE.XML_ozone.scm.address.scmservice.scm2=scm2.org
+OZONE-SITE.XML_ozone.scm.address.scmservice.scm3=scm3.org
+OZONE-SITE.XML_ozone.scm.ratis.enable=true
+OZONE-SITE.XML_hdds.scm.ha.security.enable=true
+
 OZONE-SITE.XML_ozone.om.volume.listall.allowed=false
 
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
-OZONE-SITE.XML_ozone.scm.names=scm
 OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
@@ -61,7 +68,7 @@
 CORE-SITE.XML_hadoop.security.key.provider.path=kms://http@kms:9600/kms
 
 
-OZONE-SITE.XML_hdds.scm.kerberos.principal=scm/scm@EXAMPLE.COM
+OZONE-SITE.XML_hdds.scm.kerberos.principal=scm/_HOST@EXAMPLE.COM
 OZONE-SITE.XML_hdds.scm.kerberos.keytab.file=/etc/security/keytabs/scm.keytab
 OZONE-SITE.XML_ozone.om.kerberos.principal=om/_HOST@EXAMPLE.COM
 OZONE-SITE.XML_ozone.om.kerberos.keytab.file=/etc/security/keytabs/om.keytab
@@ -83,7 +90,7 @@
 OZONE-SITE.XML_ozone.s3g.http.auth.type=kerberos
 OZONE-SITE.XML_ozone.recon.http.auth.type=kerberos
 
-OZONE-SITE.XML_hdds.scm.http.auth.kerberos.principal=HTTP/scm@EXAMPLE.COM
+OZONE-SITE.XML_hdds.scm.http.auth.kerberos.principal=HTTP/_HOST@EXAMPLE.COM
 OZONE-SITE.XML_hdds.scm.http.auth.kerberos.keytab=/etc/security/keytabs/HTTP.keytab
 OZONE-SITE.XML_ozone.om.http.auth.kerberos.principal=HTTP/_HOST@EXAMPLE.COM
 OZONE-SITE.XML_ozone.om.http.auth.kerberos.keytab=/etc/security/keytabs/HTTP.keytab
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/test.sh b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
similarity index 88%
rename from hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/test.sh
rename to hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
index 9fba980..dbc7a69 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-om-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
@@ -20,17 +20,18 @@
 
 export SECURITY_ENABLED=true
 export OM_SERVICE_ID="id1"
+export SCM=scm1.org
 
 # shellcheck source=/dev/null
 source "$COMPOSE_DIR/../testlib.sh"
 
 start_docker_env
 
-execute_robot_test scm kinit.robot
+execute_robot_test ${SCM} kinit.robot
 
-execute_robot_test scm freon
+execute_robot_test ${SCM} freon
 
-execute_robot_test scm basic/links.robot
+execute_robot_test ${SCM} basic/links.robot
 
 stop_docker_env
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index b0ce55d..ec51c42 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -329,7 +329,7 @@
    */
   @Override
   public StorageContainerLocationProtocolClientSideTranslatorPB
-      getStorageContainerLocationClient() {
+      getStorageContainerLocationClient() throws IOException {
     InetSocketAddress address = scm.getClientRpcAddress();
     LOG.info(
         "Creating StorageContainerLocationProtocol RPC client with address {}",
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index d0ead1b..33d1c1c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
@@ -56,6 +57,7 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMStorage;
@@ -302,9 +304,12 @@
       assertNotNull(scmSecurityProtocolClient);
       String caCert = scmSecurityProtocolClient.getCACertificate();
       assertNotNull(caCert);
+      // Get some random certificate, used serial id 100 which will be
+      // unavailable as our serial id is time stamp. Serial id 1 is root CA,
+      // and it is persisted in DB.
       LambdaTestUtils.intercept(SCMSecurityException.class,
           "Certificate not found",
-          () -> scmSecurityProtocolClient.getCertificate("1"));
+          () -> scmSecurityProtocolClient.getCertificate("100"));
 
       // Case 2: User without Kerberos credentials should fail.
       ugi = UserGroupInformation.createRemoteUser("test");
@@ -336,6 +341,10 @@
     SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     scmStore.setClusterId(clusterId);
     scmStore.setScmId(scmId);
+    HASecurityUtils.initializeSecurity(scmStore, scmId, conf,
+        NetUtils.createSocketAddr(InetAddress.getLocalHost().getHostName(),
+            OZONE_SCM_CLIENT_PORT_DEFAULT), true);
+    scmStore.setPrimaryScmNodeId(scmId);
     // writes the version file properties
     scmStore.initialize();
     if (SCMHAUtils.isSCMHAEnabled(conf)) {
@@ -728,7 +737,7 @@
     X500Name x500Issuer = new JcaX509CertificateHolder(cert).getIssuer();
     RDN cn = x500Issuer.getRDNs(BCStyle.CN)[0];
     String hostName = InetAddress.getLocalHost().getHostName();
-    String scmUser = "scm@" + hostName;
+    String scmUser = OzoneConsts.SCM_SUB_CA_PREFIX + hostName;
     assertEquals(scmUser, cn.getFirst().getValue().toString());
 
     // Subject name should be om login user in real world but in this test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index feaf633..97ecdd5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.client;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.security.KeyPair;
 import java.security.PrivateKey;
@@ -188,4 +189,27 @@
     return null;
   }
 
+  @Override
+  public X509Certificate getRootCACertificate() {
+    return x509Certificate;
+  }
+
+  @Override
+  public void storeRootCACertificate(String pemEncodedCert, boolean force) {
+
+  }
+
+  @Override
+  public List<String> getCAList() {
+    return null;
+  }
+  @Override
+  public List<String> listCA() throws IOException  {
+    return null;
+  }
+
+  @Override
+  public List<String> updateCAList() throws IOException  {
+    return null;
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index c025ab3..3f7245b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -52,6 +52,7 @@
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -151,7 +152,7 @@
       container.start(UUID.randomUUID().toString());
 
       XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf,
-          caClient.getCACertificate());
+          Collections.singletonList(caClient.getCACertificate()));
 
       if (blockTokenEnabled) {
         secretManager.start(caClient);
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 64c2eb6..7269c9a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1033,6 +1033,8 @@
     // When security is enabled, return SCM CA certificate to Ozone client
     // to set up gRPC TLS for client to authenticate server(DN).
     optional string caCertificate = 3;
+
+    repeated string caCerts = 4;
 }
 
 message DBUpdatesResponse {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index abc0283..382d4e7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -69,8 +69,6 @@
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
@@ -78,7 +76,6 @@
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
@@ -261,6 +258,7 @@
   private OzoneBlockTokenSecretManager blockTokenMgr;
   private CertificateClient certClient;
   private String caCertPem = null;
+  private List<String> caCertPemList = new ArrayList<>();
   private static boolean testSecureOmFlag = false;
   private final Text omRpcAddressTxt;
   private final OzoneConfiguration configuration;
@@ -845,17 +843,10 @@
    * Returns a scm container client.
    *
    * @return {@link StorageContainerLocationProtocol}
-   * @throws IOException
    */
   private static StorageContainerLocationProtocol getScmContainerClient(
       OzoneConfiguration conf) {
-    SCMContainerLocationFailoverProxyProvider proxyProvider =
-        new SCMContainerLocationFailoverProxyProvider(conf);
-    StorageContainerLocationProtocol scmContainerClient =
-        TracingUtil.createProxy(
-            new StorageContainerLocationProtocolClientSideTranslatorPB(
-                proxyProvider), StorageContainerLocationProtocol.class, conf);
-    return scmContainerClient;
+    return HAUtils.getScmContainerClient(conf);
   }
 
   /**
@@ -1123,10 +1114,14 @@
     metadataManager.start(configuration);
     startSecretManagerIfNecessary();
 
+
+    // Perform this to make it work with old clients.
     if (certClient != null) {
-      caCertPem = CertificateCodec.getPEMEncodedString(
-          certClient.getCACertificate());
+      caCertPem =
+          CertificateCodec.getPEMEncodedString(certClient.getCACertificate());
+      caCertPemList = HAUtils.buildCAList(certClient, configuration);
     }
+
     // Set metrics and start metrics back ground thread
     metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
         .getVolumeTable()));
@@ -1478,6 +1473,13 @@
         String pemEncodedRootCert = response.getX509CACertificate();
         client.storeCertificate(pemEncodedRootCert, true, true);
         client.storeCertificate(pemEncodedCert, true);
+
+        // Store Root CA certificate if available.
+        if (response.hasX509RootCACertificate()) {
+          client.storeRootCACertificate(response.getX509RootCACertificate(),
+              true);
+        }
+
         // Persist om cert serial id.
         omStore.setOmCertSerialId(CertificateCodec.
             getX509Certificate(pemEncodedCert).getSerialNumber().toString());
@@ -2675,7 +2677,7 @@
 
   @Override
   public ServiceInfoEx getServiceInfo() throws IOException {
-    return new ServiceInfoEx(getServiceList(), caCertPem);
+    return new ServiceInfoEx(getServiceList(), caCertPem, caCertPemList);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 28a16f9..02e5e18 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -23,6 +23,7 @@
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -647,13 +649,15 @@
   }
 
   private static Parameters createServerTlsParameters(SecurityConfig conf,
-      CertificateClient caClient) {
+      CertificateClient caClient) throws IOException {
     Parameters parameters = new Parameters();
 
     if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      List<X509Certificate> caList = HAUtils.buildCAX509List(caClient,
+          conf.getConfiguration());
       GrpcTlsConfig config = new GrpcTlsConfig(
           caClient.getPrivateKey(), caClient.getCertificate(),
-          caClient.getCACertificate(), true);
+          caList, true);
       GrpcConfigKeys.Server.setTlsConf(parameters, config);
       GrpcConfigKeys.Admin.setTlsConf(parameters, config);
       GrpcConfigKeys.Client.setTlsConf(parameters, config);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index d9547c4..15cfe6a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -461,6 +461,11 @@
     if (serviceInfoEx.getCaCertificate() != null) {
       resp.setCaCertificate(serviceInfoEx.getCaCertificate());
     }
+
+    for (String ca : serviceInfoEx.getCaCertPemList()) {
+      resp.addCaCerts(ca);
+    }
+
     return resp.build();
   }
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
index 3e88148..6d24968 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
@@ -32,9 +32,7 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -342,15 +340,8 @@
   }
 
   public StorageContainerLocationProtocol createStorageContainerLocationClient(
-      OzoneConfiguration ozoneConf) {
-    SCMContainerLocationFailoverProxyProvider proxyProvider =
-        new SCMContainerLocationFailoverProxyProvider(ozoneConf);
-    StorageContainerLocationProtocol client =
-        TracingUtil.createProxy(
-            new StorageContainerLocationProtocolClientSideTranslatorPB(
-                proxyProvider),
-            StorageContainerLocationProtocol.class, ozoneConf);
-    return client;
+      OzoneConfiguration ozoneConf) throws IOException {
+    return HAUtils.getScmContainerClient(ozoneConf);
   }
 
   public static Pipeline findPipelineForTest(String pipelineId,