HDDS-4897. [SCM HA Security] Create SCM Cert Client and change DefaultCA to allow selfsigned and intermediary (#2041)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
index 53d8e9a..03e4c53 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
@@ -293,7 +293,8 @@
    * @throws CertificateEncodingException - on Error.
    * @throws IOException                  - on Error.
    */
-  public X509CertificateHolder getCertificateHolder(X509Certificate x509cert)
+  public static X509CertificateHolder getCertificateHolder(
+      X509Certificate x509cert)
       throws CertificateEncodingException, IOException {
     return new X509CertificateHolder(x509cert.getEncoded());
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 33114fc..b12a022 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -25,6 +25,7 @@
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.regex.Pattern;
 
 import static org.apache.ratis.thirdparty.io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@@ -39,6 +40,7 @@
   public static final String STORAGE_DIR = "scm";
   public static final String SCM_ID = "scmUuid";
   public static final String CLUSTER_ID_PREFIX = "CID-";
+  public static final String SCM_CERT_SERIAL_ID = "scmCertSerialId";
 
   public static final String OZONE_SIMPLE_ROOT_USER = "root";
   public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
@@ -416,4 +418,14 @@
 
   // CRL Sequence Id
   public static final String CRL_SEQUENCE_ID_KEY = "CRL_SEQUENCE_ID";
+
+  public static final String SCM_CA_PATH = "ca";
+  public static final String SCM_CA_CERT_STORAGE_DIR = "scm";
+  public static final String SCM_SUB_CA_PATH = "sub-ca";
+
+  public static final String SCM_ROOT_CA_COMPONENT_NAME =
+      Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString();
+
+  public static final String SCM_SUB_CA_PREFIX = "scm-sub@";
+  public static final String SCM_ROOT_CA_PREFIX = "scm@";
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index dc70e4f..7622dda 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.PKIProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.SelfSignedCertificate;
@@ -120,7 +119,7 @@
   private final String subject;
   private final String clusterID;
   private final String scmID;
-  private String componentName = Paths.get("scm", "ca").toString();
+  private String componentName;
   private Path caKeysPath;
   private Path caRootX509Path;
   private SecurityConfig config;
@@ -141,11 +140,14 @@
    * @param certificateStore - A store used to persist Certificates.
    */
   public DefaultCAServer(String subject, String clusterID, String scmID,
-                         CertificateStore certificateStore) {
+                         CertificateStore certificateStore,
+      PKIProfile pkiProfile, String componentName) {
     this.subject = subject;
     this.clusterID = clusterID;
     this.scmID = scmID;
     this.store = certificateStore;
+    this.profile = pkiProfile;
+    this.componentName = componentName;
     lock = new ReentrantLock();
   }
 
@@ -155,28 +157,18 @@
     caKeysPath = securityConfig.getKeyLocation(componentName);
     caRootX509Path = securityConfig.getCertificateLocation(componentName);
     this.config = securityConfig;
-
-    // TODO: Make these configurable and load different profiles based on
-    // config.
-    profile = new DefaultProfile();
     this.approver = new DefaultApprover(profile, this.config);
 
     /* In future we will split this code to have different kind of CAs.
      * Right now, we have only self-signed CertificateServer.
      */
 
-    if (type == CAType.SELF_SIGNED_CA) {
-      VerificationStatus status = verifySelfSignedCA(securityConfig);
-      Consumer<SecurityConfig> caInitializer =
-          processVerificationStatus(status);
-      caInitializer.accept(securityConfig);
-      crlApprover = new DefaultCRLApprover(securityConfig,
-          getCAKeys().getPrivate());
-      return;
-    }
-
-    LOG.error("We support only Self-Signed CAs for now.");
-    throw new IllegalStateException("Not implemented functionality requested.");
+    VerificationStatus status = verifySelfSignedCA(securityConfig);
+    Consumer<SecurityConfig> caInitializer =
+        processVerificationStatus(status, type);
+    caInitializer.accept(securityConfig);
+    crlApprover = new DefaultCRLApprover(securityConfig,
+        getCAKeys().getPrivate());
   }
 
   @Override
@@ -269,6 +261,7 @@
       LocalDate endDate, PKCS10CertificationRequest csr, NodeType role)
       throws IOException,
       OperatorCreationException, CertificateException {
+
     lock.lock();
     X509CertificateHolder xcert;
     try {
@@ -276,9 +269,11 @@
           getCAKeys().getPrivate(),
           getCACertificate(), java.sql.Date.valueOf(beginDate),
           java.sql.Date.valueOf(endDate), csr, scmID, clusterID);
-      store.checkValidCertID(xcert.getSerialNumber());
-      store.storeValidCertificate(xcert.getSerialNumber(),
-          CertificateCodec.getX509Certificate(xcert), role);
+      if (store != null) {
+        store.checkValidCertID(xcert.getSerialNumber());
+        store.storeValidCertificate(xcert.getSerialNumber(),
+            CertificateCodec.getX509Certificate(xcert), role);
+      }
     } finally {
       lock.unlock();
     }
@@ -425,7 +420,7 @@
    */
   @VisibleForTesting
   Consumer<SecurityConfig> processVerificationStatus(
-      VerificationStatus status) {
+      VerificationStatus status,  CAType type) {
     Consumer<SecurityConfig> consumer = null;
     switch (status) {
     case SUCCESS:
@@ -453,19 +448,31 @@
       };
       break;
     case INITIALIZE:
-      consumer = (arg) -> {
-        try {
-          generateSelfSignedCA(arg);
-        } catch (NoSuchProviderException | NoSuchAlgorithmException
-            | IOException e) {
-          LOG.error("Unable to initialize CertificateServer.", e);
-        }
-        VerificationStatus newStatus = verifySelfSignedCA(arg);
-        if (newStatus != VerificationStatus.SUCCESS) {
-          LOG.error("Unable to initialize CertificateServer, failed in " +
-              "verification.");
-        }
-      };
+      if (type == CAType.SELF_SIGNED_CA) {
+        consumer = (arg) -> {
+          try {
+            generateSelfSignedCA(arg);
+          } catch (NoSuchProviderException | NoSuchAlgorithmException
+              | IOException e) {
+            LOG.error("Unable to initialize CertificateServer.", e);
+          }
+          VerificationStatus newStatus = verifySelfSignedCA(arg);
+          if (newStatus != VerificationStatus.SUCCESS) {
+            LOG.error("Unable to initialize CertificateServer, failed in " +
+                "verification.");
+          }
+        };
+      } else if (type == CAType.INTERMEDIARY_CA) {
+        // For sub CA certificates are generated during bootstrap/init. If
+        // both keys/certs are missing, init/bootstrap is missed to be
+        // performed.
+        consumer = (arg) -> {
+          LOG.error("Sub SCM CA Server is missing keys/certs. SCM is started " +
+              "with out init/bootstrap");
+          throw new IllegalStateException("INTERMEDIARY_CA Should not be" +
+              " in Initialize State during startup.");
+        };
+      }
       break;
     default:
       /* Make CheckStyle happy */
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultCAProfile.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultCAProfile.java
index 53eb98f..a146c73 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultCAProfile.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultCAProfile.java
@@ -19,8 +19,12 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles;
 
+import org.bouncycastle.asn1.ASN1ObjectIdentifier;
+import org.bouncycastle.asn1.x509.BasicConstraints;
 import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.KeyUsage;
 
+import java.util.Map;
 import java.util.function.BiFunction;
 
 import static java.lang.Boolean.TRUE;
@@ -32,7 +36,7 @@
  */
 public class DefaultCAProfile extends DefaultProfile {
   static final BiFunction<Extension, PKIProfile, Boolean>
-      VALIDATE_BASIC_CONSTRAINTS = (e, b) -> TRUE;
+      VALIDATE_BASIC_CONSTRAINTS = DefaultCAProfile::validateBasicExtensions;
   static final BiFunction<Extension, PKIProfile, Boolean>
       VALIDATE_CRL_NUMBER = (e, b) -> TRUE;
   static final BiFunction<Extension, PKIProfile, Boolean>
@@ -43,4 +47,38 @@
       VALIDATE_NAME_CONSTRAINTS = (e, b) -> TRUE;
   static final BiFunction<Extension, PKIProfile, Boolean>
       VALIDATE_CRL_DISTRIBUTION_POINTS = (e, b) -> TRUE;
-}
+
+
+  private static boolean validateBasicExtensions(Extension ext,
+      PKIProfile pkiProfile) {
+    BasicConstraints constraints =
+        BasicConstraints.getInstance(ext.getParsedValue());
+    if(constraints.isCA()) {
+      if (pkiProfile.isCA()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isCA() {
+    return true;
+  }
+
+  @Override
+  public Map<ASN1ObjectIdentifier,
+      BiFunction< Extension, PKIProfile, Boolean>> getExtensionsMap() {
+    // Add basic constraint.
+    EXTENSIONS_MAP.putIfAbsent(Extension.basicConstraints,
+        VALIDATE_BASIC_CONSTRAINTS);
+    return EXTENSIONS_MAP;
+  }
+
+  @Override
+  public KeyUsage getKeyUsage() {
+    return new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment
+        | KeyUsage.dataEncipherment | KeyUsage.keyAgreement | KeyUsage.cRLSign
+        | KeyUsage.keyCertSign);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultProfile.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultProfile.java
index 18659dc..7791fa9 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultProfile.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/DefaultProfile.java
@@ -77,7 +77,7 @@
       GeneralName.otherName,
   };
   // Map that handles all the Extensions lookup and validations.
-  private static final Map<ASN1ObjectIdentifier, BiFunction<Extension,
+  protected static final Map<ASN1ObjectIdentifier, BiFunction<Extension,
       PKIProfile, Boolean>> EXTENSIONS_MAP = Stream.of(
       new SimpleEntry<>(Extension.keyUsage, VALIDATE_KEY_USAGE),
       new SimpleEntry<>(Extension.subjectAlternativeName, VALIDATE_SAN),
@@ -266,7 +266,7 @@
    */
   @Override
   public ASN1ObjectIdentifier[] getSupportedExtensions() {
-    return EXTENSIONS_MAP.keySet().toArray(new ASN1ObjectIdentifier[0]);
+    return getExtensionsMap().keySet().toArray(new ASN1ObjectIdentifier[0]);
   }
 
   /**
@@ -274,7 +274,7 @@
    */
   @Override
   public boolean isSupportedExtension(Extension extension) {
-    return EXTENSIONS_MAP.containsKey(extension.getExtnId());
+    return getExtensionsMap().containsKey(extension.getExtnId());
   }
 
   /**
@@ -337,4 +337,10 @@
   public boolean isCA() {
     return false;
   }
+
+  @Override
+  public Map<ASN1ObjectIdentifier, BiFunction< Extension, PKIProfile,
+      Boolean>> getExtensionsMap() {
+    return EXTENSIONS_MAP;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/PKIProfile.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/PKIProfile.java
index c3ff198..c4878dd 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/PKIProfile.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/PKIProfiles/PKIProfile.java
@@ -26,6 +26,8 @@
 import org.bouncycastle.asn1.x509.KeyUsage;
 
 import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.function.BiFunction;
 
 /**
  * Base class for profile rules. Generally profiles are documents that define
@@ -137,4 +139,11 @@
    * @return  True, if the profile used is for CA, false otherwise.
    */
   boolean isCA();
+
+  /**
+   * Return all extensions supported by this profile.
+   * @return
+   */
+  Map<ASN1ObjectIdentifier,
+        BiFunction< Extension, PKIProfile, Boolean> > getExtensionsMap();
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index c776398..0ec4d42 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -212,4 +212,10 @@
    */
   String getSecurityProvider();
 
+  /**
+   * Return component name of this certificate client.
+   * @return component name
+   */
+  String getComponentName();
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
index 40c5b0a..8c7c9f0 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
@@ -64,4 +64,9 @@
   public Logger getLogger() {
     return LOG;
   }
+
+  @Override
+  public String getComponentName() {
+    return COMPONENT_NAME;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 8ee0019..1b04356 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -839,4 +839,8 @@
                 Client.getRpcTimeout(conf)));
     return scmSecurityClient;
   }
+
+  public String getComponentName() {
+    return null;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
index 2e1b204..0c7054a 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
@@ -124,4 +124,9 @@
   public Logger getLogger() {
     return LOG;
   }
+
+  @Override
+  public String getComponentName() {
+    return COMPONENT_NAME;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
new file mode 100644
index 0000000..d1f9040
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.x509.certificate.client;
+
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
+import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Paths;
+
+import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
+import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.GETCERT;
+import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.RECOVER;
+import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.SUCCESS;
+
+/**
+ * SCM Certificate Client which is used for generating public/private Key pair,
+ * generate CSR and finally obtain signed certificate. This Certificate
+ * client is used for setting up sub CA by SCM.
+ */
+public class SCMCertificateClient extends DefaultCertificateClient {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMCertificateClient.class);
+
+  public static final String COMPONENT_NAME =
+      Paths.get(OzoneConsts.SCM_CA_CERT_STORAGE_DIR,
+          OzoneConsts.SCM_SUB_CA_PATH).toString();
+
+  public SCMCertificateClient(SecurityConfig securityConfig,
+      String certSerialId) {
+    super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
+  }
+
+  public SCMCertificateClient(SecurityConfig securityConfig) {
+    super(securityConfig, LOG, null, COMPONENT_NAME);
+  }
+
+  @Override
+  protected InitResponse handleCase(InitCase init)
+      throws CertificateException {
+    // This is similar to OM.
+    switch (init) {
+    case NONE:
+      LOG.info("Creating keypair for client as keypair and certificate not " +
+          "found.");
+      bootstrapClientKeys();
+      return GETCERT;
+    case CERT:
+      LOG.error("Private key not found, while certificate is still present." +
+          "Delete keypair and try again.");
+      return FAILURE;
+    case PUBLIC_KEY:
+      LOG.error("Found public key but private key and certificate missing.");
+      return FAILURE;
+    case PRIVATE_KEY:
+      LOG.info("Found private key but public key and certificate is missing.");
+      // TODO: Recovering public key from private might be possible in some
+      //  cases.
+      return FAILURE;
+    case PUBLICKEY_CERT:
+      LOG.error("Found public key and certificate but private key is " +
+          "missing.");
+      return FAILURE;
+    case PRIVATEKEY_CERT:
+      LOG.info("Found private key and certificate but public key missing.");
+      if (recoverPublicKey()) {
+        return SUCCESS;
+      } else {
+        LOG.error("Public key recovery failed.");
+        return FAILURE;
+      }
+    case PUBLICKEY_PRIVATEKEY:
+      LOG.info("Found private and public key but certificate is missing.");
+      if (validateKeyPair(getPublicKey())) {
+        return RECOVER;
+      } else {
+        LOG.error("Keypair validation failed.");
+        return FAILURE;
+      }
+    case ALL:
+      LOG.info("Found certificate file along with KeyPair.");
+      if (validateKeyPairAndCertificate()) {
+        return SUCCESS;
+      } else {
+        return FAILURE;
+      }
+    default:
+      LOG.error("Unexpected case: {} (private/public/cert)",
+          Integer.toBinaryString(init.ordinal()));
+      return FAILURE;
+    }
+  }
+
+  /**
+   * Returns a CSR builder that can be used to creates a Certificate signing
+   * request.
+   *
+   * @return CertificateSignRequest.Builder
+   */
+  @Override
+  public CertificateSignRequest.Builder getCSRBuilder()
+      throws CertificateException {
+    return super.getCSRBuilder()
+        .setDigitalEncryption(true)
+        .setDigitalSignature(true)
+        // Set CA to true, as this will be used to sign certs for OM/DN.
+        .setCA(true);
+  }
+
+
+  @Override
+  public Logger getLogger() {
+    return LOG;
+  }
+
+  @Override
+  public String getComponentName() {
+    return COMPONENT_NAME;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
index 79ff3a5..f5a2d8c 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
@@ -23,6 +23,10 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -31,6 +35,7 @@
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +43,7 @@
 
 import java.io.IOException;
 import java.math.BigInteger;
+import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchProviderException;
@@ -56,6 +62,10 @@
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.OM;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
+import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer.CAType.INTERMEDIARY_CA;
+import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer.CAType.SELF_SIGNED_CA;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CA_CERT_STORAGE_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CA_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -81,24 +91,16 @@
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4), caStore);
-    testCA.init(securityConfig, CertificateServer.CAType.SELF_SIGNED_CA);
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
+    testCA.init(securityConfig, SELF_SIGNED_CA);
     X509CertificateHolder first = testCA.getCACertificate();
     assertNotNull(first);
     //Init is idempotent.
-    testCA.init(securityConfig, CertificateServer.CAType.SELF_SIGNED_CA);
+    testCA.init(securityConfig, SELF_SIGNED_CA);
     X509CertificateHolder second = testCA.getCACertificate();
     assertEquals(first, second);
-
-    // we only support Self Signed CA for now.
-    try {
-      testCA.init(securityConfig, CertificateServer.CAType.INTERMEDIARY_CA);
-      fail("code should not reach here, exception should have been thrown.");
-    } catch (IllegalStateException e) {
-      // This is a run time exception, hence it is not caught by the junit
-      // expected Exception.
-      assertTrue(e.toString().contains("Not implemented"));
-    }
   }
 
   @Test
@@ -106,10 +108,12 @@
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4), caStore);
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     Consumer<SecurityConfig> caInitializer =
         ((DefaultCAServer) testCA).processVerificationStatus(
-        DefaultCAServer.VerificationStatus.MISSING_CERTIFICATE);
+        DefaultCAServer.VerificationStatus.MISSING_CERTIFICATE, SELF_SIGNED_CA);
     try {
 
       caInitializer.accept(securityConfig);
@@ -126,10 +130,12 @@
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4), caStore);
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     Consumer<SecurityConfig> caInitializer =
         ((DefaultCAServer) testCA).processVerificationStatus(
-            DefaultCAServer.VerificationStatus.MISSING_KEYS);
+            DefaultCAServer.VerificationStatus.MISSING_KEYS, SELF_SIGNED_CA);
     try {
 
       caInitializer.accept(securityConfig);
@@ -175,9 +181,11 @@
     String csrString = CertificateSignRequest.getEncodedString(csr);
 
     CertificateServer testCA = new DefaultCAServer("testCA",
-        clusterId, scmId, caStore);
+        clusterId, scmId, caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     testCA.init(new SecurityConfig(conf),
-        CertificateServer.CAType.SELF_SIGNED_CA);
+        SELF_SIGNED_CA);
 
     Future<X509CertificateHolder> holder = testCA.requestCertificate(csrString,
         CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM);
@@ -217,9 +225,11 @@
 
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4), caStore);
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     testCA.init(new SecurityConfig(conf),
-        CertificateServer.CAType.SELF_SIGNED_CA);
+        SELF_SIGNED_CA);
 
     Future<X509CertificateHolder> holder = testCA.requestCertificate(csrString,
         CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
@@ -235,9 +245,11 @@
     Date now = new Date();
 
     CertificateServer testCA = new DefaultCAServer("testCA",
-        clusterId, scmId, caStore);
+        clusterId, scmId, caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     testCA.init(new SecurityConfig(conf),
-        CertificateServer.CAType.SELF_SIGNED_CA);
+        SELF_SIGNED_CA);
 
     KeyPair keyPair =
         new HDDSKeyGenerator(conf).generateKey();
@@ -301,9 +313,11 @@
 
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4), caStore);
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(),
+        Paths.get(SCM_CA_CERT_STORAGE_DIR, SCM_CA_PATH).toString());
     testCA.init(new SecurityConfig(conf),
-        CertificateServer.CAType.SELF_SIGNED_CA);
+        SELF_SIGNED_CA);
 
     LambdaTestUtils.intercept(ExecutionException.class, "ScmId and " +
             "ClusterId in CSR subject are incorrect",
@@ -316,4 +330,82 @@
         });
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testIntermediaryCAWithEmpty()
+      throws Exception {
+
+    CertificateServer scmCA = new DefaultCAServer("testCA",
+        RandomStringUtils.randomAlphabetic(4),
+        RandomStringUtils.randomAlphabetic(4), caStore,
+        new DefaultProfile(), Paths.get("scm").toString());
+
+    scmCA.init(new SecurityConfig(conf), INTERMEDIARY_CA);
+  }
+
+  @Test
+  public void testIntermediaryCA() throws Exception {
+
+    String clusterId = RandomStringUtils.randomAlphanumeric(4);
+    String scmId = RandomStringUtils.randomAlphanumeric(4);
+
+    CertificateServer rootCA = new DefaultCAServer("rootCA",
+        clusterId, scmId, caStore, new DefaultProfile(),
+        Paths.get("scm", "ca").toString());
+
+    rootCA.init(new SecurityConfig(conf), SELF_SIGNED_CA);
+
+
+    SCMCertificateClient scmCertificateClient =
+        new SCMCertificateClient(new SecurityConfig(conf));
+
+    CertificateClient.InitResponse response = scmCertificateClient.init();
+    Assert.assertEquals(CertificateClient.InitResponse.GETCERT, response);
+
+    // Generate cert
+    KeyPair keyPair =
+        new HDDSKeyGenerator(conf).generateKey();
+    PKCS10CertificationRequest csr = new CertificateSignRequest.Builder()
+        .addDnsName("hadoop.apache.org")
+        .addIpAddress("8.8.8.8")
+        .setCA(false)
+        .setSubject("testCA")
+        .setConfiguration(conf)
+        .setKey(keyPair)
+        .build();
+
+    Future<X509CertificateHolder> holder = rootCA.requestCertificate(csr,
+        CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM);
+    Assert.assertTrue(holder.isDone());
+
+    X509CertificateHolder certificateHolder = holder.get();
+    Assert.assertNotNull(certificateHolder);
+
+    X509CertificateHolder rootCertHolder = rootCA.getCACertificate();
+
+    scmCertificateClient.storeCertificate(
+        CertificateCodec.getPEMEncodedString(rootCertHolder), true, true);
+
+    // Write to the location where Default CA Server reads from.
+    scmCertificateClient.storeCertificate(
+        CertificateCodec.getPEMEncodedString(certificateHolder), true);
+
+    CertificateCodec certCodec =
+        new CertificateCodec(new SecurityConfig(conf),
+            scmCertificateClient.getComponentName());
+    certCodec.writeCertificate(certificateHolder);
+
+    // The certificate generated by above cert client will be used by scmCA.
+    // Now scmCA init should be successful.
+    CertificateServer scmCA = new DefaultCAServer("scmCA",
+        clusterId, scmId, caStore, new DefaultProfile(),
+        scmCertificateClient.getComponentName());
+
+    try {
+      scmCA.init(new SecurityConfig(conf), INTERMEDIARY_CA);
+    } catch (Exception e) {
+      fail("testIntermediaryCA failed during init");
+    }
+
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
new file mode 100644
index 0000000..9b917db
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.server.SCMCertStore;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultCAProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse;
+import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.pkcs.PKCS10CertificationRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.security.KeyPair;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
+import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
+import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
+
+public final class HASecurityUtils {
+
+  private HASecurityUtils() {
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(HASecurityUtils.class);
+
+  /**
+   * Initialize Security which generates public, private key pair and get SCM
+   * signed certificate and persist to local disk.
+   * @param scmStorageConfig
+   * @param fetchedScmId
+   * @param conf
+   * @param scmAddress
+   * @throws IOException
+   */
+  public static void initializeSecurity(SCMStorageConfig scmStorageConfig,
+      String fetchedScmId, OzoneConfiguration conf,
+      InetSocketAddress scmAddress, boolean primaryscm)
+      throws IOException {
+    LOG.info("Initializing secure StorageContainerManager.");
+
+    CertificateClient certClient =
+        new SCMCertificateClient(new SecurityConfig(conf));
+    InitResponse response = certClient.init();
+    LOG.info("Init response: {}", response);
+    switch (response) {
+    case SUCCESS:
+      LOG.info("Initialization successful.");
+      break;
+    case GETCERT:
+      if (!primaryscm) {
+        getRootCASignedSCMCert(certClient, conf, fetchedScmId, scmStorageConfig,
+            scmAddress);
+      } else {
+        getPrimarySCMSelfSignedCert(certClient, conf, fetchedScmId,
+            scmStorageConfig, scmAddress);
+      }
+      LOG.info("Successfully stored SCM signed certificate.");
+      break;
+    case FAILURE:
+      LOG.error("SCM security initialization failed.");
+      throw new RuntimeException("OM security initialization failed.");
+    case RECOVER:
+      LOG.error("SCM security initialization failed. SCM certificate is " +
+          "missing.");
+      throw new RuntimeException("SCM security initialization failed.");
+    default:
+      LOG.error("SCM security initialization failed. Init response: {}",
+          response);
+      throw new RuntimeException("SCM security initialization failed.");
+    }
+  }
+
+  /**
+   * For bootstrapped SCM get sub-ca signed certificate and root CA
+   * certificate using scm security client and store it using certificate
+   * client.
+   */
+  private static void getRootCASignedSCMCert(CertificateClient client,
+      OzoneConfiguration config, String fetchedSCMId,
+      SCMStorageConfig scmStorageConfig, InetSocketAddress scmAddress) {
+    try {
+      // Generate CSR.
+      PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
+          config, scmAddress, fetchedSCMId);
+
+      ScmNodeDetailsProto scmNodeDetailsProto =
+          ScmNodeDetailsProto.newBuilder()
+              .setClusterId(scmStorageConfig.getClusterID())
+              .setHostName(scmAddress.getHostName())
+              .setScmNodeId(fetchedSCMId).build();
+
+      // Create SCM security client.
+      SCMSecurityProtocolClientSideTranslatorPB secureScmClient =
+          HddsServerUtil.getScmSecurityClient(config);
+
+      // Get SCM sub CA cert.
+      SCMGetCertResponseProto response = secureScmClient.
+          getSCMCertChain(scmNodeDetailsProto, getEncodedString(csr));
+      String pemEncodedCert = response.getX509Certificate();
+
+      // Store SCM sub CA and root CA certificate.
+      if (response.hasX509CACertificate()) {
+        String pemEncodedRootCert = response.getX509CACertificate();
+        client.storeCertificate(pemEncodedRootCert, true, true);
+        client.storeCertificate(pemEncodedCert, true);
+
+        X509Certificate certificate =
+            CertificateCodec.getX509Certificate(pemEncodedCert);
+
+        persistSubCACertificate(config, client,
+            CertificateCodec.getCertificateHolder(certificate));
+
+        // Persist scm cert serial ID.
+        scmStorageConfig.setScmCertSerialId(certificate.getSerialNumber()
+            .toString());
+      } else {
+        throw new RuntimeException("Unable to retrieve SCM certificate chain");
+      }
+    } catch (IOException | CertificateException e) {
+      LOG.error("Error while fetching/storing SCM signed certificate.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * For primary SCM get sub-ca signed certificate and root CA certificate by
+   * root CA certificate server and store it using certificate client.
+   */
+  private static void getPrimarySCMSelfSignedCert(CertificateClient client,
+      OzoneConfiguration config, String fetchedSCMId,
+      SCMStorageConfig scmStorageConfig, InetSocketAddress scmAddress) {
+
+    try {
+
+      CertificateServer rootCAServer =
+          initializeRootCertificateServer(config, null, scmStorageConfig);
+
+      PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
+          config, scmAddress, fetchedSCMId);
+
+      X509CertificateHolder subSCMCertHolder = rootCAServer.
+          requestCertificate(csr, KERBEROS_TRUSTED, SCM).get();
+
+      X509CertificateHolder rootCACertificateHolder =
+          rootCAServer.getCACertificate();
+
+      String pemEncodedCert =
+          CertificateCodec.getPEMEncodedString(subSCMCertHolder);
+
+      String pemEncodedRootCert =
+          CertificateCodec.getPEMEncodedString(rootCACertificateHolder);
+
+
+      client.storeCertificate(pemEncodedRootCert, true, true);
+      client.storeCertificate(pemEncodedCert, true);
+
+
+      persistSubCACertificate(config, client, subSCMCertHolder);
+
+      // Persist scm cert serial ID.
+      scmStorageConfig.setScmCertSerialId(subSCMCertHolder.getSerialNumber()
+          .toString());
+    } catch (InterruptedException | ExecutionException| IOException |
+        CertificateException  e) {
+      LOG.error("Error while fetching/storing SCM signed certificate.", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  /**
+   * This function creates/initializes a certificate server as needed.
+   * This function is idempotent, so calling this again and again after the
+   * server is initialized is not a problem.
+   *
+   * @param config
+   * @param scmCertStore
+   * @param scmStorageConfig
+   */
+  public static CertificateServer initializeRootCertificateServer(
+      OzoneConfiguration config, SCMCertStore scmCertStore,
+      SCMStorageConfig scmStorageConfig)
+      throws IOException {
+    String subject = SCM_ROOT_CA_PREFIX +
+        InetAddress.getLocalHost().getHostName();
+
+    DefaultCAServer rootCAServer = new DefaultCAServer(subject,
+        scmStorageConfig.getClusterID(),
+        scmStorageConfig.getScmId(), scmCertStore, new DefaultCAProfile(),
+        SCM_ROOT_CA_COMPONENT_NAME);
+
+    rootCAServer.init(new SecurityConfig(config),
+        CertificateServer.CAType.SELF_SIGNED_CA);
+
+    return rootCAServer;
+  }
+
+  /**
+   * Generate CSR to obtain SCM sub CA certificate.
+   */
+  private static PKCS10CertificationRequest generateCSR(
+      CertificateClient client, SCMStorageConfig scmStorageConfig,
+      OzoneConfiguration config, InetSocketAddress scmAddress,
+      String fetchedSCMId) throws IOException {
+    CertificateSignRequest.Builder builder = client.getCSRBuilder();
+    KeyPair keyPair = new KeyPair(client.getPublicKey(),
+        client.getPrivateKey());
+
+    // Get host name.
+    String hostname = scmAddress.getAddress().getHostName();
+
+    String subject = SCM_SUB_CA_PREFIX + hostname;
+
+    builder.setKey(keyPair)
+        .setConfiguration(config)
+        .setScmID(fetchedSCMId)
+        .setClusterID(scmStorageConfig.getClusterID())
+        .setSubject(subject);
+
+
+    LOG.info("Creating csr for SCM->hostName:{},scmId:{},clusterId:{}," +
+            "subject:{}", hostname, fetchedSCMId,
+        scmStorageConfig.getClusterID(), subject);
+
+    return builder.build();
+  }
+
+  /**
+   * Persists the sub SCM signed certificate to the location which can be
+   * read by sub CA Certificate server.
+   * @param config
+   * @param certificateClient
+   * @param certificateHolder
+   * @throws IOException
+   */
+  private static void persistSubCACertificate(OzoneConfiguration config,
+      CertificateClient certificateClient,
+      X509CertificateHolder certificateHolder) throws IOException {
+    CertificateCodec certCodec =
+        new CertificateCodec(new SecurityConfig(config),
+            certificateClient.getComponentName());
+
+    certCodec.writeCertificate(certificateHolder);
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
index 013dfe6..5d9f4de 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
@@ -27,6 +27,7 @@
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CERT_SERIAL_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
 
@@ -77,4 +78,20 @@
     return scmProperties;
   }
 
+  /**
+   * Sets the SCM Sub-CA certificate serial id.
+   * @param certSerialId
+   * @throws IOException
+   */
+  public void setScmCertSerialId(String certSerialId) throws IOException {
+    getStorageInfo().setProperty(SCM_CERT_SERIAL_ID, certSerialId);
+  }
+
+  /**
+   * Retrives the SCM Sub-CA certificate serial id from the version file.
+   * @return scm sub-CA certificate serial id
+   */
+  public String getScmCertSerialId() {
+    return getStorageInfo().getProperty(SCM_CERT_SERIAL_ID);
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a4167c6..22d41fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -65,6 +65,7 @@
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
 import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -139,6 +140,8 @@
 import static org.apache.hadoop.hdds.utils.HAUtils.checkSecurityAndSCMHAEnabled;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_PREFIX;
 
 /**
  * StorageContainerManager is the main entry point for the service that
@@ -655,7 +658,8 @@
       String scmID) throws IOException {
     // TODO: Support Certificate Server loading via Class Name loader.
     // So it is easy to use different Certificate Servers if needed.
-    String subject = "scm@" + InetAddress.getLocalHost().getHostName();
+    String subject = SCM_ROOT_CA_PREFIX +
+        InetAddress.getLocalHost().getHostName();
     if(this.scmMetadataStore == null) {
       LOG.error("Cannot initialize Certificate Server without a valid meta " +
           "data layer.");
@@ -668,7 +672,8 @@
             .setRatisServer(scmHAManager.getRatisServer())
             .setCRLSequenceId(getLastSequenceIdForCRL()).build();
 
-    return new DefaultCAServer(subject, clusterID, scmID, certStore);
+    return new DefaultCAServer(subject, clusterID, scmID, certStore,
+        new DefaultProfile(), SCM_ROOT_CA_COMPONENT_NAME);
   }
 
   long getLastSequenceIdForCRL() throws IOException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 5910b2c..feaf633 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -183,4 +183,9 @@
     return securityConfig.getProvider();
   }
 
+  @Override
+  public String getComponentName() {
+    return null;
+  }
+
 }