HDDS-4998. [SCM HA Security] Make storeValidCertificate method idempotent (#2063)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
index b1de2c6..0a669b9 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
@@ -55,6 +55,14 @@
       X509Certificate certificate, NodeType role) throws IOException;
 
   /**
+   * Check certificate serialID exists or not. If exists throws an exception.
+   * @param serialID
+   * @throws IOException
+   */
+  void checkValidCertID(BigInteger serialID) throws IOException;
+
+
+  /**
    * Adds the certificates to be revoked to a new CRL and moves all the
    * certificates in a transactional manner from valid certificate to
    * revoked certificate state. Returns an empty {@code Optional} instance if
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index 7ea28b6..39f610c 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -89,7 +89,8 @@
       Date validTill,
       PKCS10CertificationRequest certificationRequest,
       String scmId,
-      String clusterId) throws IOException, OperatorCreationException {
+      String clusterId) throws IOException,
+      OperatorCreationException {
 
     AlgorithmIdentifier sigAlgId = new
         DefaultSignatureAlgorithmIdentifierFinder().find(
@@ -135,7 +136,7 @@
         new X509v3CertificateBuilder(
             caCertificate.getSubject(),
             // Serial is not sequential but it is monotonically increasing.
-            BigInteger.valueOf(Time.monotonicNowNanos()),
+            BigInteger.valueOf(generateSerialId()),
             validFrom,
             validTill,
             x500Name, keyInfo);
@@ -155,6 +156,12 @@
 
   }
 
+  public long generateSerialId() {
+    // TODO: to make generation of serialId distributed.
+    // This issue will be fixed in HDDS-4999.
+    return Time.monotonicNowNanos();
+  }
+
   @Override
   public CompletableFuture<X509CertificateHolder> inspectCSR(String csr)
       throws IOException {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 2cd8993..dc70e4f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -60,6 +60,8 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getCertificationRequest;
@@ -129,6 +131,7 @@
   private CertificateApprover approver;
   private CRLApprover crlApprover;
   private CertificateStore store;
+  private Lock lock;
 
   /**
    * Create an Instance of DefaultCAServer.
@@ -143,6 +146,7 @@
     this.clusterID = clusterID;
     this.scmID = scmID;
     this.store = certificateStore;
+    lock = new ReentrantLock();
   }
 
   @Override
@@ -265,12 +269,19 @@
       LocalDate endDate, PKCS10CertificationRequest csr, NodeType role)
       throws IOException,
       OperatorCreationException, CertificateException {
-    X509CertificateHolder xcert = approver.sign(config,
-        getCAKeys().getPrivate(),
-        getCACertificate(), java.sql.Date.valueOf(beginDate),
-        java.sql.Date.valueOf(endDate), csr, scmID, clusterID);
-    store.storeValidCertificate(xcert.getSerialNumber(),
-        CertificateCodec.getX509Certificate(xcert), role);
+    lock.lock();
+    X509CertificateHolder xcert;
+    try {
+      xcert = approver.sign(config,
+          getCAKeys().getPrivate(),
+          getCACertificate(), java.sql.Date.valueOf(beginDate),
+          java.sql.Date.valueOf(endDate), csr, scmID, clusterID);
+      store.checkValidCertID(xcert.getSerialNumber());
+      store.storeValidCertificate(xcert.getSerialNumber(),
+          CertificateCodec.getX509Certificate(xcert), role);
+    } finally {
+      lock.unlock();
+    }
     return xcert;
   }
 
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
index 366b285..c60c975 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
@@ -43,6 +43,11 @@
   }
 
   @Override
+  public void checkValidCertID(BigInteger serialID) throws IOException {
+
+  }
+
+  @Override
   public Optional<Long> revokeCertificates(
       List<BigInteger> serialIDs,
       X509CertificateHolder caCertificateHolder,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index afbfc07..ee4bc20 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -80,7 +80,6 @@
     lock.lock();
     try {
       // This makes sure that no certificate IDs are reusable.
-      checkValidCertID(serialID);
       if (role == SCM) {
         // If the role is SCM, store certificate in scm cert table
         // and valid cert table. This is to help to return scm certs during
@@ -106,7 +105,6 @@
       X509Certificate certificate) throws IOException {
     lock.lock();
     try {
-      checkValidCertID(serialID);
       BatchOperation batchOperation =
           scmMetadataStore.getBatchHandler().initBatchOperation();
       scmMetadataStore.getValidSCMCertsTable().putWithBatch(batchOperation,
@@ -119,10 +117,15 @@
     }
   }
 
-  private void checkValidCertID(BigInteger serialID) throws IOException {
-    if ((getCertificateByID(serialID, VALID_CERTS) != null) ||
-        (getCertificateByID(serialID, CertType.REVOKED_CERTS) != null)) {
-      throw new SCMSecurityException("Conflicting certificate ID");
+  public void checkValidCertID(BigInteger serialID) throws IOException {
+    lock.lock();
+    try {
+      if ((getCertificateByID(serialID, VALID_CERTS) != null) ||
+          (getCertificateByID(serialID, CertType.REVOKED_CERTS) != null)) {
+        throw new SCMSecurityException("Conflicting certificate ID" + serialID);
+      }
+    } finally {
+      lock.unlock();
     }
   }