HDDS-5033. SCM may not be able to know full port list of Datanode after Datanode is started. (#2090)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 3791483..7faa741 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -199,14 +199,14 @@
    *
    * @param port DataNode port
    */
-  public void setPort(Port port) {
+  public synchronized void setPort(Port port) {
     // If the port is already in the list remove it first and add the
     // new/updated port value.
     ports.remove(port);
     ports.add(port);
   }
 
-  public void setPort(Name name, int port) {
+  public synchronized void setPort(Name name, int port) {
     setPort(new Port(name, port));
   }
 
@@ -215,7 +215,7 @@
    *
    * @return DataNode Ports
    */
-  public List<Port> getPorts() {
+  public synchronized List<Port> getPorts() {
     return ports;
   }
 
@@ -266,7 +266,7 @@
    *
    * @return Port
    */
-  public Port getPort(Port.Name name) {
+  public synchronized Port getPort(Port.Name name) {
     for (Port port : ports) {
       if (port.getName().equals(name)) {
         return port;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 5bb9fb1..9d503aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -26,7 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -64,6 +64,7 @@
 import com.google.common.collect.Maps;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,10 +90,14 @@
   private List<ContainerDataScanner> dataScanners;
   private final BlockDeletingService blockDeletingService;
   private final GrpcTlsConfig tlsClientConfig;
-  private final AtomicBoolean isStarted;
+  private final AtomicReference<InitializingStatus> initializingStatus;
   private final ReplicationServer replicationServer;
   private DatanodeDetails datanodeDetails;
 
+  enum InitializingStatus {
+    UNINITIALIZED, INITIALIZING, INITIALIZED
+  }
+
   /**
    * Construct OzoneContainer object.
    *
@@ -178,7 +183,8 @@
     tlsClientConfig = RatisHelper.createTlsClientConfig(secConf,
         x509Certificates);
 
-    isStarted = new AtomicBoolean(false);
+    initializingStatus =
+        new AtomicReference<>(InitializingStatus.UNINITIALIZED);
   }
 
   public GrpcTlsConfig getTlsClientConfig() {
@@ -265,10 +271,24 @@
    * @throws IOException
    */
   public void start(String clusterId) throws IOException {
-    if (!isStarted.compareAndSet(false, true)) {
+    // If SCM HA is enabled, OzoneContainer#start() will be called multi-times
+    // from VersionEndpointTask. The first call should do the initializing job,
+    // the successive calls should wait until OzoneContainer is initialized.
+    if (!initializingStatus.compareAndSet(
+        InitializingStatus.UNINITIALIZED, InitializingStatus.INITIALIZING)) {
+
+      // wait OzoneContainer to finish its initializing.
+      while (initializingStatus.get() != InitializingStatus.INITIALIZED) {
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
       LOG.info("Ignore. OzoneContainer already started.");
       return;
     }
+
     LOG.info("Attempting to start container services.");
     startContainerScrub();
 
@@ -280,6 +300,9 @@
     hddsDispatcher.init();
     hddsDispatcher.setClusterId(clusterId);
     blockDeletingService.start();
+
+    // mark OzoneContainer as INITIALIZED.
+    initializingStatus.set(InitializingStatus.INITIALIZED);
   }
 
   /**