HDDS-7022. EC: Open EC container are not closed when SCM container was already closed. (#3668)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 79cf324..5b8d6da 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -42,7 +43,10 @@
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -53,6 +57,7 @@
 import java.time.Clock;
 import java.time.Duration;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +69,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
@@ -401,6 +407,16 @@
       return new ContainerHealthResult.HealthyResult(containerInfo);
     }
 
+    if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
+      List<ContainerReplica> unhealthyReplicas = replicas.stream()
+          .filter(r -> !compareState(containerInfo.getState(), r.getState()))
+          .collect(Collectors.toList());
+
+      if (unhealthyReplicas.size() > 0) {
+        handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
+      }
+    }
+
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
     ContainerHealthResult health = ecContainerHealthCheck
@@ -433,6 +449,67 @@
   }
 
   /**
+   * Handles unhealthy container.
+   * A container is inconsistent if any of the replica state doesn't
+   * match the container state. We have to take appropriate action
+   * based on state of the replica.
+   *
+   * @param container ContainerInfo
+   * @param unhealthyReplicas List of ContainerReplica
+   */
+  private void handleUnhealthyReplicas(final ContainerInfo container,
+      List<ContainerReplica> unhealthyReplicas) {
+    Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+    while (iterator.hasNext()) {
+      final ContainerReplica replica = iterator.next();
+      final ContainerReplicaProto.State state = replica.getState();
+      if (state == State.OPEN || state == State.CLOSING) {
+        sendCloseCommand(container, replica.getDatanodeDetails(), true);
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Sends close container command for the given container to the given
+   * datanode.
+   *
+   * @param container Container to be closed
+   * @param datanode The datanode on which the container
+   *                  has to be closed
+   * @param force Should be set to true if we want to force close.
+   */
+  private void sendCloseCommand(final ContainerInfo container,
+      final DatanodeDetails datanode, final boolean force) {
+
+    ContainerID containerID = container.containerID();
+    LOG.info("Sending close container command for container {}" +
+        " to datanode {}.", containerID, datanode);
+    CloseContainerCommand closeContainerCommand =
+        new CloseContainerCommand(container.getContainerID(),
+            container.getPipelineID(), force);
+    try {
+      closeContainerCommand.setTerm(scmContext.getTermOfLeader());
+    } catch (NotLeaderException nle) {
+      LOG.warn("Skip sending close container command,"
+          + " since current SCM is not leader.", nle);
+      return;
+    }
+    closeContainerCommand.setEncodedToken(getContainerToken(containerID));
+    eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+        new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
+  }
+
+  private String getContainerToken(ContainerID containerID) {
+    if (scmContext.getScm() instanceof StorageContainerManager) {
+      StorageContainerManager scm =
+          (StorageContainerManager) scmContext.getScm();
+      return scm.getContainerTokenGenerator().generateEncodedToken(containerID);
+    }
+    return ""; // unit test
+  }
+
+  /**
    * Creates a priority queue of UnderReplicatedHealthResult, where the elements
    * are ordered by the weighted redundancy of the container. This means that
    * containers with the least remaining redundancy are at the front of the
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index da49bef..38ec64c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -23,15 +23,17 @@
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -43,9 +45,9 @@
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.ozone.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -54,6 +56,7 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
@@ -78,7 +81,7 @@
   /**
    * Create a MiniDFSCluster for testing.
    */
-  @BeforeClass
+  @BeforeAll
   public static void init() throws Exception {
     chunkSize = 1024;
     flushSize = 2 * chunkSize;
@@ -125,7 +128,7 @@
   /**
    * Shutdown MiniDFSCluster.
    */
-  @AfterClass
+  @AfterAll
   public static void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
@@ -180,30 +183,30 @@
     List<ContainerInfo> containers =
         cluster.getStorageContainerManager().getContainerManager()
             .getContainers();
-    long containerID = 0;
+    ContainerInfo container = null;
     for (ContainerInfo info : containers) {
       if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
-        containerID = info.containerID().getProtobuf().getId();
+        container = info;
       }
     }
     StorageContainerManager scm = cluster.getStorageContainerManager();
-    StorageContainerLocationProtocol scmContainerClient =
-        HAUtils.getScmContainerClient(cluster.getConf());
-    scmContainerClient.closeContainer(containerID);
-    // Make sure replica closed.
-    waitForDNContainerState(ContainerProtos.ContainerDataProto.State.CLOSED,
-        pipeline, containerID);
+
+    // Shutting sown DN triggers close pipeline and close container.
+    cluster.shutdownHddsDatanode(pipeline.getFirstNode());
+
+    // Make sure container closed.
+    waitForSCMContainerState(StorageContainerDatanodeProtocolProtos
+        .ContainerReplicaProto.State.CLOSED, container.containerID());
     //Temporarily stop the RM process.
     scm.getReplicationManager().stop();
 
-    // Stop the DN and wait for the lower replication.
-    cluster.shutdownHddsDatanode(pipeline.getFirstNode());
-    waitForContainerCount(4, containerID, scm);
+    // Wait for the lower replication.
+    waitForContainerCount(4, container.containerID(), scm);
 
     // Start the RM to resume the replication process and wait for the
     // reconstruction.
     scm.getReplicationManager().start();
-    waitForContainerCount(5, containerID, scm);
+    waitForContainerCount(5, container.containerID(), scm);
 
     // Let's verify for Over replications now.
     //Temporarily stop the RM process.
@@ -213,35 +216,56 @@
     // increased.
     cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
     // Check container is over replicated.
-    waitForContainerCount(6, containerID, scm);
+    waitForContainerCount(6, container.containerID(), scm);
+    // Wait for all the replicas to be closed.
+    container = scm.getContainerInfo(container.getContainerID());
+    waitForDNContainerState(container, scm);
 
     // Resume RM and wait the over replicated replica deleted.
     scm.getReplicationManager().start();
-    waitForContainerCount(5, containerID, scm);
+    waitForContainerCount(5, container.containerID(), scm);
   }
 
-  private void waitForDNContainerState(
-      ContainerProtos.ContainerDataProto.State state, Pipeline pipeline,
-      long containerID) throws TimeoutException, InterruptedException {
-    //Wait until container closed at DN
+  private void waitForDNContainerState(ContainerInfo container,
+      StorageContainerManager scm) throws InterruptedException,
+      TimeoutException {
     GenericTestUtils.waitFor(() -> {
       try {
-        return cluster.getHddsDatanode(pipeline.getFirstNode())
-            .getDatanodeStateMachine().getContainer().getContainerSet()
-            .getContainer(containerID).getContainerState() == state;
+        List<ContainerReplica> unhealthyReplicas = scm.getContainerManager()
+            .getContainerReplicas(container.containerID()).stream()
+            .filter(r -> !ReplicationManager
+                .compareState(container.getState(), r.getState()))
+            .collect(Collectors.toList());
+        return unhealthyReplicas.size() == 0;
+      } catch (ContainerNotFoundException e) {
+        return false;
+      }
+    }, 100, 100000);
+  }
+
+  private void waitForSCMContainerState(
+      StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State state,
+      ContainerID containerID) throws TimeoutException, InterruptedException {
+    //Wait until container closed at SCM
+    GenericTestUtils.waitFor(() -> {
+      try {
+        HddsProtos.LifeCycleState containerState = cluster
+            .getStorageContainerManager().getContainerManager()
+            .getContainer(containerID).getState();
+        return ReplicationManager.compareState(containerState, state);
       } catch (IOException e) {
         return false;
       }
     }, 100, 100000);
   }
 
-  private void waitForContainerCount(int count, long containerID,
+  private void waitForContainerCount(int count, ContainerID containerID,
       StorageContainerManager scm)
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       try {
         return scm.getContainerManager()
-            .getContainerReplicas(ContainerID.valueOf(containerID))
+            .getContainerReplicas(containerID)
             .size() == count;
       } catch (ContainerNotFoundException e) {
         return false;