HDDS-4642. SCM security protocol support for query CRLs and latest CRL id for OM and Datanode. (#2030)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
index 778a6d4..d11dd8b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
 /**
@@ -132,4 +133,18 @@
    */
   List<String> listCACertificate() throws IOException;
 
+  /**
+   * Get the CRLInfo based on the CRL Id.
+   * @param crlIds - crl ids
+   * @return list of CRLInfo
+   * @throws IOException
+   */
+  List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
+
+  /**
+   * Get the latest CRL id.
+   * @return latest CRL id.
+   */
+  long getLatestCrlId() throws IOException;
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 67d4daf..e034f7b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -18,12 +18,16 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.security.cert.CRLException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.CRLInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
@@ -32,8 +36,10 @@
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCACertificateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest.Builder;
@@ -41,6 +47,7 @@
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
@@ -319,6 +326,36 @@
         .getListCertificateResponseProto().getCertificatesList();
   }
 
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    SCMGetCrlsRequestProto protoIns = SCMGetCrlsRequestProto
+        .newBuilder()
+        .addAllCrlId(crlIds)
+        .build();
+    List<CRLInfoProto> crlInfoProtoList = submitRequest(Type.GetCrls,
+        builder -> builder.setGetCrlsRequest(protoIns))
+        .getGetCrlsResponseProto().getCrlInfosList();
+    List<CRLInfo> result = new ArrayList<>();
+    for (CRLInfoProto crlProto : crlInfoProtoList) {
+      try {
+        CRLInfo crlInfo = CRLInfo.fromProtobuf(crlProto);
+        result.add(crlInfo);
+      } catch (CRLException | CertificateException e) {
+        throw new SCMSecurityException("Fail to parse CRL info", e);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public long getLatestCrlId() throws IOException {
+    SCMGetLatestCrlIdRequestProto protoIns =  SCMGetLatestCrlIdRequestProto
+        .getDefaultInstance();
+    return submitRequest(Type.GetLatestCrlId,
+        builder -> builder.setGetLatestCrlIdRequest(protoIns))
+        .getGetLatestCrlIdResponseProto().getCrlId();
+  }
+
   /**
    * Return the proxy object underlying this protocol translator.
    *
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
index 4fa60d7..3912d71 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.bouncycastle.asn1.x509.CRLReason;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
@@ -139,6 +140,20 @@
   void reinitialize(SCMMetadataStore scmMetadataStore);
 
   /**
+   * Get the CRLInfo based on the CRL Ids.
+   * @param crlIds - list of crl ids
+   * @return CRLInfo
+   * @throws IOException
+   */
+  List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
+
+  /**
+   * Get the latest CRL id.
+   * @return latest CRL id.
+   */
+  long getLatestCrlId();
+
+  /**
    * Make it explicit what type of CertificateServer we are creating here.
    */
   enum CAType {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
index 30a1946..98fa8b2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
 import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
 import org.bouncycastle.asn1.x509.CRLReason;
 import org.bouncycastle.cert.X509CertificateHolder;
@@ -143,6 +144,20 @@
   void reinitialize(SCMMetadataStore metadataStore);
 
   /**
+   * Get the CRLInfo based on the CRL Ids.
+   * @param crlIds - list of crl ids
+   * @return CRLInfo
+   * @throws IOException
+   */
+  List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
+
+  /**
+   * Get the latest CRL id.
+   * @return latest CRL id.
+   */
+  long getLatestCrlId();
+
+  /**
    * Different kind of Certificate stores.
    */
   enum CertType {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 7a83509..ae2ca01 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.PKIProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.SelfSignedCertificate;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -334,6 +335,22 @@
   }
 
   /**
+   * Get the CRLInfo based on the CRL Ids.
+   * @param crlIds - list of crl ids
+   * @return CRLInfo
+   * @throws IOException
+   */
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    return store.getCrls(crlIds);
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return store.getLatestCrlId();
+  }
+
+  /**
    * Generates a Self Signed CertificateServer. These are the steps in
    * generating a Self-Signed CertificateServer.
    * <p>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index fb51227..8a093d6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.security.x509.certificate.client;
 
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 
 import java.io.IOException;
@@ -261,4 +262,20 @@
    */
   List<String> updateCAList() throws IOException;
 
+
+  /**
+   * Get the CRLInfo based on the CRL Ids.
+   * @param crlIds - list of crl ids
+   * @return list of CRLInfo
+   * @throws IOException
+   */
+  List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
+
+  /**
+   * Get the latest CRL id.
+   * @return latest CRL id.
+   * @throws IOException
+   */
+  long getLatestCrlId() throws IOException;
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 19c0e78..ba68ca6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -46,7 +46,7 @@
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
@@ -68,6 +68,8 @@
 import static org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CRYPTO_SIGNATURE_VERIFICATION_ERROR;
 import static org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CRYPTO_SIGN_ERROR;
 import static org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CSR_ERROR;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.slf4j.Logger;
 
@@ -284,6 +286,33 @@
     return this.getCertificateFromScm(certId);
   }
 
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    try {
+      SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
+          securityConfig.getConfiguration());
+      return scmSecurityProtocolClient.getCrls(crlIds);
+    } catch (Exception e) {
+      getLogger().error("Error while getting CRL with " +
+          "CRL ids:{} from scm.", crlIds, e);
+      throw new CertificateException("Error while getting CRL with " +
+          "CRL ids:" + crlIds, e);
+    }
+  }
+
+  @Override
+  public long getLatestCrlId() throws IOException {
+    try {
+      SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
+          securityConfig.getConfiguration());
+      return scmSecurityProtocolClient.getLatestCrlId();
+    } catch (Exception e) {
+      getLogger().error("Error while getting latest CRL id from scm.", e);
+      throw new CertificateException("Error while getting latest CRL id from" +
+          " scm.", e);
+    }
+  }
+
   /**
    * Get certificate from SCM and store it in local file system.
    * @param certId
@@ -296,7 +325,7 @@
         certId);
     try {
       SCMSecurityProtocol scmSecurityProtocolClient =
-          HddsServerUtil.getScmSecurityClient(
+          getScmSecurityClient(
           (OzoneConfiguration) securityConfig.getConfiguration());
       String pemEncodedCert =
           scmSecurityProtocolClient.getCertificate(certId);
@@ -899,8 +928,7 @@
     lock.lock();
     try {
       SCMSecurityProtocol scmSecurityProtocolClient =
-          HddsServerUtil.getScmSecurityClient(
-              securityConfig.getConfiguration());
+          getScmSecurityClient(securityConfig.getConfiguration());
       pemEncodedCACerts =
           scmSecurityProtocolClient.listCACertificate();
       return pemEncodedCACerts;
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
index 9c473b9..ec15722 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.bouncycastle.asn1.x509.CRLReason;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
@@ -90,4 +91,14 @@
 
   @Override
   public void reinitialize(SCMMetadataStore metadataStore) {}
+
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return 0;
+  }
 }
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index e270b27..557d7f2 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -51,6 +51,8 @@
     optional SCMListCertificateRequestProto listCertificateRequest = 7;
     optional SCMGetSCMCertRequestProto getSCMCertificateRequest = 8;
     optional SCMListCACertificateRequestProto listCACertificateRequestProto = 9;
+    optional SCMGetCrlsRequestProto getCrlsRequest = 10;
+    optional SCMGetLatestCrlIdRequestProto getLatestCrlIdRequest = 11;
 
 }
 
@@ -71,6 +73,10 @@
 
     optional SCMListCertificateResponseProto listCertificateResponseProto = 7;
 
+    optional SCMGetCrlsResponseProto getCrlsResponseProto = 8;
+
+    optional SCMGetLatestCrlIdResponseProto getLatestCrlIdResponseProto = 9;
+
 }
 
 enum Type {
@@ -82,6 +88,8 @@
     GetSCMCertificate = 6;
     GetRootCACertificate = 7;
     ListCACertificate = 8;
+    GetCrls = 9;
+    GetLatestCrlId = 10;
 }
 
 enum Status {
@@ -182,6 +190,27 @@
 message SCMListCACertificateRequestProto {
 }
 
+/**
+* Proto request to get CRL.
+*/
+message SCMGetCrlsRequestProto {
+    repeated int64 crlId = 1;
+}
+
+message SCMGetCrlsResponseProto {
+    repeated CRLInfoProto crlInfos = 1;
+}
+
+/**
+* Proto request to get latest CRL id.
+*/
+message SCMGetLatestCrlIdRequestProto {
+}
+
+message SCMGetLatestCrlIdResponseProto {
+    optional int64 crlId = 1;
+}
+
 service SCMSecurityProtocolService {
     rpc submitRequest (SCMSecurityRequest) returns (SCMSecurityResponse);
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index 6bd50a4..6829f95 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -24,7 +24,11 @@
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto.ResponseCode;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
@@ -37,6 +41,7 @@
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 
@@ -127,6 +132,17 @@
       case ListCACertificate:
         return scmSecurityResponse.setListCertificateResponseProto(
             listCACertificate()).build();
+      case GetCrls:
+        return SCMSecurityResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setGetCrlsResponseProto(getCrls(request.getGetCrlsRequest()))
+            .build();
+      case GetLatestCrlId:
+        return SCMSecurityResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setGetLatestCrlIdResponseProto(getLatestCrlId(
+                request.getGetLatestCrlIdRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
@@ -284,7 +300,31 @@
             .addAllCertificates(certs);
     return builder.build();
 
+  }
 
+  public SCMGetCrlsResponseProto getCrls(
+      SCMGetCrlsRequestProto request) throws IOException {
+    List<CRLInfo> crls = impl.getCrls(request.getCrlIdList());
+    SCMGetCrlsResponseProto.Builder builder =
+        SCMGetCrlsResponseProto.newBuilder();
+    for (CRLInfo crl : crls) {
+      try {
+        builder.addCrlInfos(crl.getProtobuf());
+      } catch (SCMSecurityException e) {
+        LOG.error("Fail in parsing CRL info", e);
+        throw new SCMSecurityException("Fail in parsing CRL info", e);
+      }
+    }
+    return builder.build();
+  }
+
+  public SCMGetLatestCrlIdResponseProto getLatestCrlId(
+      SCMGetLatestCrlIdRequestProto request) throws IOException {
+    SCMGetLatestCrlIdResponseProto.Builder builder =
+        SCMGetLatestCrlIdResponseProto
+            .newBuilder().
+            setCrlId(impl.getLatestCrlId());
+    return builder.build();
   }
 
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index 2aaa88c..195cad6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -336,4 +336,27 @@
 
     }
   }
+
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    List<CRLInfo> results = new ArrayList<>();
+    for (Long crlId : crlIds) {
+      try {
+        CRLInfo crlInfo =
+            scmMetadataStore.getCRLInfoTable().get(crlId);
+        results.add(crlInfo);
+      } catch (IOException e) {
+        LOG.error("Fail to get CRLs from SCM metadata store for crlId: "
+            + crlId, e);
+        throw new SCMSecurityException("Fail to get CRLs from SCM metadata " +
+            "store for crlId: " + crlId, e);
+      }
+    }
+    return results;
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return crlSequenceId.get();
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index fc820a4..4e04cb1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -322,6 +323,16 @@
     return null;
   }
 
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    return scmCertificateServer.getCrls(crlIds);
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return scmCertificateServer.getLatestCrlId();
+  }
+
   public RPC.Server getRpcServer() {
     return rpcServer;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMCertStore.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMCertStore.java
index 4e85449..fa3ac9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMCertStore.java
@@ -47,6 +47,7 @@
 import java.security.cert.X509CRLEntry;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
@@ -155,6 +156,13 @@
     assertTrue("Timestamp should be greater than 0",
         certInfo.getTimestamp() > 0L);
 
+    long crlId = scmCertStore.getLatestCrlId();
+    assertEquals(sequenceId.get().longValue(), crlId);
+
+    List<CRLInfo> crls = scmCertStore.getCrls(Arrays.asList(crlId));
+    assertEquals(1, crls.size());
+
+
     // CRL Info table should have a CRL with sequence id
     assertNotNull(scmMetadataStore.getCRLInfoTable()
         .get(sequenceId.get()));
@@ -163,8 +171,7 @@
     assertEquals(INITIAL_SEQUENCE_ID + 1L, (long)
         scmMetadataStore.getCRLSequenceIdTable().get(CRL_SEQUENCE_ID_KEY));
 
-    CRLInfo crlInfo =
-        scmMetadataStore.getCRLInfoTable().get(sequenceId.get());
+    CRLInfo crlInfo = crls.get(0);
 
     Set<? extends X509CRLEntry> revokedCertificates =
         crlInfo.getX509CRL().getRevokedCertificates();
@@ -199,14 +206,16 @@
     // This should create a CRL with sequence id INITIAL_SEQUENCE_ID + 2
     // And contain 2 certificates in it
     assertTrue(sequenceId.isPresent());
+    assertEquals(sequenceId.get().longValue(),
+        scmCertStore.getLatestCrlId());
     assertEquals(INITIAL_SEQUENCE_ID + 2L, (long) sequenceId.get());
 
     // Check the sequence ID table for latest sequence id
     assertEquals(INITIAL_SEQUENCE_ID + 2L, (long)
         scmMetadataStore.getCRLSequenceIdTable().get(CRL_SEQUENCE_ID_KEY));
 
-    CRLInfo newCrlInfo = scmMetadataStore.getCRLInfoTable()
-        .get(sequenceId.get());
+    CRLInfo newCrlInfo = scmCertStore.getCrls(Arrays.asList(
+        INITIAL_SEQUENCE_ID + 2)).get(0);
     revokedCertificates = newCrlInfo.getX509CRL().getRevokedCertificates();
     assertEquals(2L, revokedCertificates.size());
     assertNotNull(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 97ecdd5..6bac924 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -25,6 +25,7 @@
 import java.security.cert.X509Certificate;
 import java.time.LocalDate;
 import java.time.temporal.ChronoUnit;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -32,6 +33,7 @@
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.SelfSignedCertificate;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 
@@ -212,4 +214,15 @@
   public List<String> updateCAList() throws IOException  {
     return null;
   }
+
+  @Override
+  public List<CRLInfo> getCrls(List<Long> crlIds) throws IOException {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public long getLatestCrlId() throws IOException {
+    return 0;
+  }
+
 }