HDDS-7192. EC: ReplicationManager - create handlers to perform various container checks (#3743)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
new file mode 100644
index 0000000..0433bd5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * Simple class to wrap the parameters needed to check a container's health
+ * in ReplicationManager.
+ */
+public final class ContainerCheckRequest {
+
+  private final ContainerInfo containerInfo;
+  private final Set<ContainerReplica> containerReplicas;
+  private final List<ContainerReplicaOp> pendingOps;
+  private final int maintenanceRedundancy;
+  private final ReplicationManagerReport report;
+  private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+      underRepQueue;
+  private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
+      overRepQueue;
+
+
+  private ContainerCheckRequest(Builder builder) {
+    this.containerInfo = builder.containerInfo;
+    this.containerReplicas = builder.containerReplicas;
+    this.pendingOps = builder.pendingOps;
+    this.maintenanceRedundancy = builder.maintenanceRedundancy;
+    this.report = builder.report;
+    this.overRepQueue = builder.overRepQueue;
+    this.underRepQueue = builder.underRepQueue;
+  }
+
+  public List<ContainerReplicaOp> getPendingOps() {
+    return pendingOps;
+  }
+
+  public int getMaintenanceRedundancy() {
+    return maintenanceRedundancy;
+  }
+
+  public Set<ContainerReplica> getContainerReplicas() {
+    return containerReplicas;
+  }
+
+  public ContainerInfo getContainerInfo() {
+    return containerInfo;
+  }
+
+  public ReplicationManagerReport getReport() {
+    return report;
+  }
+
+  public Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+      getUnderRepQueue() {
+    return underRepQueue;
+  }
+
+  public Queue<ContainerHealthResult.OverReplicatedHealthResult>
+      getOverRepQueue() {
+    return overRepQueue;
+  }
+
+  /**
+   * Builder class for ContainerCheckRequest.
+   */
+  public static class Builder {
+
+    private ContainerInfo containerInfo;
+    private Set<ContainerReplica> containerReplicas;
+    private List<ContainerReplicaOp> pendingOps;
+    private int maintenanceRedundancy;
+    private ReplicationManagerReport report;
+    private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+        underRepQueue;
+    private Queue<ContainerHealthResult.OverReplicatedHealthResult>
+        overRepQueue;
+
+    public Builder setContainerInfo(ContainerInfo containerInfo) {
+      this.containerInfo = containerInfo;
+      return this;
+    }
+
+    public Builder setContainerReplicas(
+        Set<ContainerReplica> containerReplicas) {
+      this.containerReplicas = containerReplicas;
+      return this;
+    }
+
+    public Builder setPendingOps(List<ContainerReplicaOp> pendingOps) {
+      this.pendingOps = pendingOps;
+      return this;
+    }
+
+    public Builder setMaintenanceRedundancy(int maintenanceRedundancy) {
+      this.maintenanceRedundancy = maintenanceRedundancy;
+      return this;
+    }
+
+    public Builder setUnderRepQueue(
+        Queue<ContainerHealthResult.UnderReplicatedHealthResult> queue) {
+      this.underRepQueue = queue;
+      return this;
+    }
+
+    public Builder setOverRepQueue(
+        Queue<ContainerHealthResult.OverReplicatedHealthResult> queue) {
+      this.overRepQueue = queue;
+      return this;
+    }
+
+    public Builder setReport(ReplicationManagerReport report) {
+      this.report = report;
+      return this;
+    }
+
+    public ContainerCheckRequest build() {
+      return new ContainerCheckRequest(this);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
deleted file mode 100644
index 5d14d04..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Interface used by ReplicationManager to check if containers are healthy or
- * not.
- */
-public interface ContainerHealthCheck {
-
-  ContainerHealthResult checkHealth(
-      ContainerInfo container, Set<ContainerReplica> replicas,
-      List<ContainerReplicaOp> replicaPendingOps,
-      int remainingRedundancyForMaintenance);
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index 70ec74b..de29029 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -41,7 +41,8 @@
   private final HealthState healthState;
   private final List<SCMCommand> commands = new ArrayList<>();
 
-  ContainerHealthResult(ContainerInfo containerInfo, HealthState healthState) {
+  public ContainerHealthResult(ContainerInfo containerInfo,
+      HealthState healthState) {
     this.containerInfo = containerInfo;
     this.healthState = healthState;
   }
@@ -67,7 +68,7 @@
    */
   public static class HealthyResult extends ContainerHealthResult {
 
-    HealthyResult(ContainerInfo containerInfo) {
+    public HealthyResult(ContainerInfo containerInfo) {
       super(containerInfo, HealthState.HEALTHY);
     }
   }
@@ -108,7 +109,7 @@
     private final boolean unrecoverable;
     private int requeueCount = 0;
 
-    UnderReplicatedHealthResult(ContainerInfo containerInfo,
+    public UnderReplicatedHealthResult(ContainerInfo containerInfo,
         int remainingRedundancy, boolean dueToDecommission,
         boolean replicatedOkWithPending, boolean unrecoverable) {
       super(containerInfo, HealthState.UNDER_REPLICATED);
@@ -207,7 +208,7 @@
     private final boolean sufficientlyReplicatedAfterPending;
 
 
-    OverReplicatedHealthResult(ContainerInfo containerInfo,
+    public OverReplicatedHealthResult(ContainerInfo containerInfo,
         int excessRedundancy, boolean replicatedOkWithPending) {
       super(containerInfo, HealthState.OVER_REPLICATED);
       this.excessRedundancy = excessRedundancy;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
deleted file mode 100644
index 092cc13..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Class to determine the health state of an EC Container. Given the container
- * and current replica details, along with replicas pending add and delete,
- * this class will return a ContainerHealthResult indicating if the container
- * is healthy, or under / over replicated etc.
- *
- * For EC Containers, it is possible for a container to be both under and over
- * replicated, if there are multiple copies of one index and no copies of
- * another. This class only returns a single status, keeping the container in a
- * single health state at any given time. Under replicated is a more serious
- * state than over replicated, so it will take precedence over any over
- * replication.
- */
-public class ECContainerHealthCheck implements ContainerHealthCheck {
-
-  // TODO - mis-replicated containers are not yet handled.
-  // TODO - should this class handle empty / deleting containers, or would it
-  //        be better handled elsewhere?
-
-  @Override
-  public ContainerHealthResult checkHealth(ContainerInfo container,
-      Set<ContainerReplica> replicas,
-      List<ContainerReplicaOp> replicaPendingOps,
-      int remainingRedundancyForMaintenance) {
-    ECContainerReplicaCount replicaCount =
-        new ECContainerReplicaCount(container, replicas, replicaPendingOps,
-            remainingRedundancyForMaintenance);
-
-    ECReplicationConfig repConfig =
-        (ECReplicationConfig) container.getReplicationConfig();
-
-    if (!replicaCount.isSufficientlyReplicated(false)) {
-      List<Integer> missingIndexes = replicaCount.unavailableIndexes(false);
-      int remainingRedundancy = repConfig.getParity();
-      boolean dueToDecommission = true;
-      if (missingIndexes.size() > 0) {
-        // The container has reduced redundancy and will need reconstructed
-        // via an EC reconstruction command. Note that it may also have some
-        // replicas in decommission / maintenance states, but as the under
-        // replication is not caused only by decommission, we say it is not
-        // due to decommission/
-        dueToDecommission = false;
-        remainingRedundancy = repConfig.getParity() - missingIndexes.size();
-      }
-      return new ContainerHealthResult.UnderReplicatedHealthResult(
-          container, remainingRedundancy, dueToDecommission,
-          replicaCount.isSufficientlyReplicated(true),
-          replicaCount.isUnrecoverable());
-    }
-
-    if (replicaCount.isOverReplicated(false)) {
-      List<Integer> overRepIndexes = replicaCount.overReplicatedIndexes(false);
-      return new ContainerHealthResult
-          .OverReplicatedHealthResult(container, overRepIndexes.size(),
-          !replicaCount.isOverReplicated(true));
-    }
-
-    // No issues detected, so return healthy.
-    return new ContainerHealthResult.HealthyResult(container);
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 98dda59..4fba8be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -47,14 +48,15 @@
   public static final Logger LOG =
       LoggerFactory.getLogger(ECOverReplicationHandler.class);
 
-  private final ECContainerHealthCheck ecContainerHealthCheck =
-      new ECContainerHealthCheck();
+  private final ECReplicationCheckHandler ecReplicationCheck;
   private final NodeManager nodeManager;
 
-  public ECOverReplicationHandler(PlacementPolicy placementPolicy,
-                                  NodeManager nodeManager) {
+  public ECOverReplicationHandler(ECReplicationCheckHandler ecReplicationCheck,
+      PlacementPolicy placementPolicy, NodeManager nodeManager) {
     super(placementPolicy);
+    this.ecReplicationCheck = ecReplicationCheck;
     this.nodeManager = nodeManager;
+
   }
 
   /**
@@ -74,9 +76,15 @@
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy) {
     ContainerInfo container = result.getContainerInfo();
-    ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck
-        .checkHealth(container, replicas, pendingOps,
-            remainingMaintenanceRedundancy);
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setContainerInfo(container)
+        .setContainerReplicas(replicas)
+        .setPendingOps(pendingOps)
+        .setMaintenanceRedundancy(remainingMaintenanceRedundancy)
+        .build();
+    ContainerHealthResult currentUnderRepRes = ecReplicationCheck
+        .checkHealth(request);
     LOG.debug("Handling over-replicated EC container: {}", container);
 
     //sanity check
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 9b8014f..5feb160 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
@@ -57,15 +58,15 @@
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ECUnderReplicationHandler.class);
-  private final ECContainerHealthCheck ecContainerHealthCheck =
-      new ECContainerHealthCheck();
+  private final ECReplicationCheckHandler ecReplicationCheck;
   private final PlacementPolicy containerPlacement;
   private final long currentContainerSize;
   private final NodeManager nodeManager;
 
-  public ECUnderReplicationHandler(
+  public ECUnderReplicationHandler(ECReplicationCheckHandler ecReplicationCheck,
       final PlacementPolicy containerPlacement, final ConfigurationSource conf,
-      NodeManager nodeManager) {
+                                   NodeManager nodeManager) {
+    this.ecReplicationCheck = ecReplicationCheck;
     this.containerPlacement = containerPlacement;
     this.currentContainerSize = (long) conf
         .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -102,9 +103,15 @@
         new ECContainerReplicaCount(container, replicas, pendingOps,
             remainingMaintenanceRedundancy);
 
-    ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck
-        .checkHealth(container, replicas, pendingOps,
-            remainingMaintenanceRedundancy);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setContainerInfo(container)
+        .setContainerReplicas(replicas)
+        .setPendingOps(pendingOps)
+        .setMaintenanceRedundancy(remainingMaintenanceRedundancy)
+        .build();
+
+    ContainerHealthResult currentUnderRepRes = ecReplicationCheck
+        .checkHealth(request);
 
     LOG.debug("Handling under-replicated EC container: {}", container);
     if (currentUnderRepRes
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 3017a71..699d523 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -36,7 +35,10 @@
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithMismatchedReplicasHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.HealthCheck;
+import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -57,7 +59,6 @@
 import java.time.Clock;
 import java.time.Duration;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -69,7 +70,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
@@ -143,7 +143,7 @@
   private long lastTimeToBeReadyInMillis = 0;
   private final Clock clock;
   private final ContainerReplicaPendingOps containerReplicaPendingOps;
-  private final ContainerHealthCheck ecContainerHealthCheck;
+  private final ECReplicationCheckHandler ecReplicationCheckHandler;
   private final EventPublisher eventPublisher;
   private final ReentrantLock lock = new ReentrantLock();
   private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
@@ -157,6 +157,7 @@
   private Thread overReplicatedProcessorThread;
   private final UnderReplicatedProcessor underReplicatedProcessor;
   private final OverReplicatedProcessor overReplicatedProcessor;
+  private final HealthCheck containerCheckChain;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -191,21 +192,28 @@
         TimeUnit.MILLISECONDS);
     this.containerReplicaPendingOps = replicaPendingOps;
     this.legacyReplicationManager = legacyReplicationManager;
-    this.ecContainerHealthCheck = new ECContainerHealthCheck();
+    this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
     this.nodeManager = nodeManager;
     this.underRepQueue = createUnderReplicatedQueue();
     this.overRepQueue = new LinkedList<>();
     this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
-        containerPlacement, conf, nodeManager);
+        ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
     ecOverReplicationHandler =
-        new ECOverReplicationHandler(containerPlacement, nodeManager);
+        new ECOverReplicationHandler(ecReplicationCheckHandler,
+            containerPlacement, nodeManager);
     underReplicatedProcessor =
         new UnderReplicatedProcessor(this, containerReplicaPendingOps,
             eventPublisher, rmConf.getUnderReplicatedInterval());
     overReplicatedProcessor =
         new OverReplicatedProcessor(this, containerReplicaPendingOps,
             eventPublisher, rmConf.getOverReplicatedInterval());
+
+    // Chain together the series of checks that are needed to validate the
+    // containers when they are checked by RM.
+    containerCheckChain = new OpenContainerHandler(this);
+    containerCheckChain.addNext(new ClosedWithMismatchedReplicasHandler(this))
+        .addNext(ecReplicationCheckHandler);
     start();
   }
 
@@ -358,6 +366,10 @@
     }
   }
 
+  public void sendCloseContainerEvent(ContainerID containerID) {
+    eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+  }
+
   /**
    * Add an under replicated container back to the queue if it was unable to
    * be processed. Its retry count will be incremented before it is re-queued,
@@ -419,7 +431,7 @@
     return scmContext.getTermOfLeader();
   }
 
-  protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+  protected void processContainer(ContainerInfo containerInfo,
       Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
       Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep,
       ReplicationManagerReport report) throws ContainerNotFoundException {
@@ -427,77 +439,25 @@
     ContainerID containerID = containerInfo.containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
         containerID);
-
-    if (containerInfo.getState() == HddsProtos.LifeCycleState.OPEN) {
-      if (!isOpenContainerHealthy(containerInfo, replicas)) {
-        report.incrementAndSample(
-            HealthState.OPEN_UNHEALTHY, containerID);
-        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-        return new ContainerHealthResult.UnHealthyResult(containerInfo);
-      }
-      return new ContainerHealthResult.HealthyResult(containerInfo);
-    }
-
-    if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
-      List<ContainerReplica> unhealthyReplicas = replicas.stream()
-          .filter(r -> !compareState(containerInfo.getState(), r.getState()))
-          .collect(Collectors.toList());
-
-      if (unhealthyReplicas.size() > 0) {
-        handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
-      }
-    }
-
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
-    ContainerHealthResult health = ecContainerHealthCheck.checkHealth(
-        containerInfo, replicas, pendingOps, maintenanceRedundancy);
-      // TODO - should the report have a HEALTHY state, rather than just bad
-      //        states? It would need to be added to legacy RM too.
-    if (health.getHealthState()
-        == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
-      report.incrementAndSample(HealthState.UNDER_REPLICATED, containerID);
-      ContainerHealthResult.UnderReplicatedHealthResult underHealth
-          = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
-      if (underHealth.isUnrecoverable()) {
-        // TODO - do we need a new health state for unrecoverable EC?
-        report.incrementAndSample(HealthState.MISSING, containerID);
-      }
-      if (!underHealth.isSufficientlyReplicatedAfterPending() &&
-          !underHealth.isUnrecoverable()) {
-        underRep.add(underHealth);
-      }
-    } else if (health.getHealthState()
-        == ContainerHealthResult.HealthState.OVER_REPLICATED) {
-      report.incrementAndSample(HealthState.OVER_REPLICATED, containerID);
-      ContainerHealthResult.OverReplicatedHealthResult overHealth
-          = ((ContainerHealthResult.OverReplicatedHealthResult) health);
-      if (!overHealth.isSufficientlyReplicatedAfterPending()) {
-        overRep.add(overHealth);
-      }
-    }
-    return health;
-  }
 
-  /**
-   * Handles unhealthy container.
-   * A container is inconsistent if any of the replica state doesn't
-   * match the container state. We have to take appropriate action
-   * based on state of the replica.
-   *
-   * @param container ContainerInfo
-   * @param unhealthyReplicas List of ContainerReplica
-   */
-  private void handleUnhealthyReplicas(final ContainerInfo container,
-      List<ContainerReplica> unhealthyReplicas) {
-    Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
-    while (iterator.hasNext()) {
-      final ContainerReplica replica = iterator.next();
-      final ContainerReplicaProto.State state = replica.getState();
-      if (state == State.OPEN || state == State.CLOSING) {
-        sendCloseCommand(container, replica.getDatanodeDetails(), true);
-        iterator.remove();
-      }
+    ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(replicas)
+        .setMaintenanceRedundancy(maintenanceRedundancy)
+        .setReport(report)
+        .setPendingOps(pendingOps)
+        .setUnderRepQueue(underRep)
+        .setOverRepQueue(overRep)
+        .build();
+    // This will call the chain of container health handlers in turn which
+    // will issue commands as needed, update the report and perhaps add
+    // containers to the over and under replicated queue.
+    boolean handled = containerCheckChain.handleChain(checkRequest);
+    if (!handled) {
+      LOG.debug("Container {} had no actions after passing through the " +
+          "check chain", containerInfo.containerID());
     }
   }
 
@@ -510,7 +470,7 @@
    *                  has to be closed
    * @param force Should be set to true if we want to force close.
    */
-  private void sendCloseCommand(final ContainerInfo container,
+  public void sendCloseContainerReplicaCommand(final ContainerInfo container,
       final DatanodeDetails datanode, final boolean force) {
 
     ContainerID containerID = container.containerID();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java
new file mode 100644
index 0000000..5fd83ef
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+
+/**
+ * Abstract base class for Container Health Checks to extend.
+ */
+public abstract class AbstractCheck implements HealthCheck {
+
+  private HealthCheck successor = null;
+
+  /**
+   * Handle the container and apply the given check to it. This may result in
+   * commands getting generated to correct container states, or nothing
+   * happening if the container is healthy.
+   * @param request ContainerCheckRequest object representing the container
+   * @return True if the request was handled or false if it was not and should
+   *         be handled by the next handler in the chain.
+   */
+  public boolean handleChain(ContainerCheckRequest request) {
+    boolean result = handle(request);
+    if (!result && successor != null) {
+      return successor.handleChain(request);
+    }
+    return result;
+  }
+
+  /**
+   * Add a subsequent HealthCheck that will be tried only if the current check
+   * returns false. This allows handlers to be chained together, and each will
+   * be tried in turn until one succeeds.
+   * @param healthCheck The HealthCheck to add to the successor, which will be
+   *                    tried if the current check returns false.
+   * @return The passed in health check, so the next check in the chain can be
+   *         easily added to the successor.
+   */
+  @Override
+  public HealthCheck addNext(HealthCheck healthCheck) {
+    successor = healthCheck;
+    return healthCheck;
+  }
+
+  /**
+   * Handle the container and apply the given check to it. This may result in
+   * commands getting generated to correct container states, or nothing
+   * happening if the container is healthy.
+   * @param request ContainerCheckRequest object representing the container
+   * @return True if the request was handled or false if it was not and should
+   *         be handled by the next handler in the chain.
+   */
+  @Override
+  public abstract boolean handle(ContainerCheckRequest request);
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java
new file mode 100644
index 0000000..f5bb193
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handler to process containers which are closed, but some replicas are still
+ * open or closing. This handler will send a command to the datanodes for each
+ * mis-matched replica to close it.
+ */
+public class ClosedWithMismatchedReplicasHandler extends AbstractCheck {
+
+  private ReplicationManager replicationManager;
+
+  public ClosedWithMismatchedReplicasHandler(
+      ReplicationManager replicationManager) {
+    this.replicationManager = replicationManager;
+  }
+
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    ContainerInfo containerInfo = request.getContainerInfo();
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+    if (containerInfo.getState() != HddsProtos.LifeCycleState.CLOSED) {
+      // Handler is only relevant for CLOSED containers.
+      return false;
+    }
+    List<ContainerReplica> unhealthyReplicas = replicas.stream()
+        .filter(r -> !ReplicationManager
+            .compareState(containerInfo.getState(), r.getState()))
+        .collect(Collectors.toList());
+
+    if (unhealthyReplicas.size() > 0) {
+      handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Handles unhealthy container.
+   * A container is inconsistent if any of the replica state doesn't
+   * match the container state. We have to take appropriate action
+   * based on state of the replica.
+   *
+   * @param container ContainerInfo
+   * @param unhealthyReplicas List of ContainerReplica
+   */
+  private void handleUnhealthyReplicas(final ContainerInfo container,
+      List<ContainerReplica> unhealthyReplicas) {
+    Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+    while (iterator.hasNext()) {
+      final ContainerReplica replica = iterator.next();
+      final ContainerReplicaProto.State state = replica.getState();
+      if (state == ContainerReplicaProto.State.OPEN
+          || state == ContainerReplicaProto.State.CLOSING) {
+        replicationManager.sendCloseContainerReplicaCommand(
+            container, replica.getDatanodeDetails(), true);
+        iterator.remove();
+      }
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
new file mode 100644
index 0000000..67936c8
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ECContainerReplicaCount;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+
+/**
+ * Container Check handler to check the under / over replication state for
+ * EC containers. If any containers are found to be over or under replicated
+ * they are added to the queue passed within the request object.
+ */
+public class ECReplicationCheckHandler extends AbstractCheck {
+
+  public ECReplicationCheckHandler() {
+  }
+
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    if (request.getContainerInfo().getReplicationType() != EC) {
+      // This handler is only for EC containers.
+      return false;
+    }
+    ReplicationManagerReport report = request.getReport();
+    ContainerInfo container = request.getContainerInfo();
+    ContainerID containerID = container.containerID();
+    ContainerHealthResult health = checkHealth(request);
+    if (health.getHealthState() == ContainerHealthResult.HealthState.HEALTHY) {
+      // If the container is healthy, there is nothing else to do in this
+      // handler so return as unhandled so any further handlers will be tried.
+      return false;
+    }
+    // TODO - should the report have a HEALTHY state, rather than just bad
+    //        states? It would need to be added to legacy RM too.
+    if (health.getHealthState()
+        == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+      report.incrementAndSample(
+          ReplicationManagerReport.HealthState.UNDER_REPLICATED, containerID);
+      ContainerHealthResult.UnderReplicatedHealthResult underHealth
+          = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+      if (underHealth.isUnrecoverable()) {
+        // TODO - do we need a new health state for unrecoverable EC?
+        report.incrementAndSample(
+            ReplicationManagerReport.HealthState.MISSING, containerID);
+      }
+      // TODO - if it is unrecoverable, should we return false to other
+      //        handlers can be tried?
+      if (!underHealth.isSufficientlyReplicatedAfterPending() &&
+          !underHealth.isUnrecoverable()) {
+        request.getUnderRepQueue().add(underHealth);
+      }
+      return true;
+    } else if (health.getHealthState()
+        == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+      report.incrementAndSample(
+          ReplicationManagerReport.HealthState.OVER_REPLICATED, containerID);
+      ContainerHealthResult.OverReplicatedHealthResult overHealth
+          = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+      if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+        request.getOverRepQueue().add(overHealth);
+      }
+      return true;
+    }
+    // Should not get here, but incase it does the container is not healthy,
+    // but is also not under or over replicated.
+    return false;
+  }
+
+  public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
+    ContainerInfo container = request.getContainerInfo();
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+    List<ContainerReplicaOp> replicaPendingOps = request.getPendingOps();
+    ECContainerReplicaCount replicaCount =
+        new ECContainerReplicaCount(container, replicas, replicaPendingOps,
+            request.getMaintenanceRedundancy());
+
+    ECReplicationConfig repConfig =
+        (ECReplicationConfig) container.getReplicationConfig();
+
+    if (!replicaCount.isSufficientlyReplicated(false)) {
+      List<Integer> missingIndexes = replicaCount.unavailableIndexes(false);
+      int remainingRedundancy = repConfig.getParity();
+      boolean dueToDecommission = true;
+      if (missingIndexes.size() > 0) {
+        // The container has reduced redundancy and will need reconstructed
+        // via an EC reconstruction command. Note that it may also have some
+        // replicas in decommission / maintenance states, but as the under
+        // replication is not caused only by decommission, we say it is not
+        // due to decommission/
+        dueToDecommission = false;
+        remainingRedundancy = repConfig.getParity() - missingIndexes.size();
+      }
+      return new ContainerHealthResult.UnderReplicatedHealthResult(
+          container, remainingRedundancy, dueToDecommission,
+          replicaCount.isSufficientlyReplicated(true),
+          replicaCount.isUnrecoverable());
+    }
+
+    if (replicaCount.isOverReplicated(false)) {
+      List<Integer> overRepIndexes = replicaCount.overReplicatedIndexes(false);
+      return new ContainerHealthResult
+          .OverReplicatedHealthResult(container, overRepIndexes.size(),
+          !replicaCount.isOverReplicated(true));
+    }
+    // No issues detected, so return healthy.
+    return new ContainerHealthResult.HealthyResult(container);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java
new file mode 100644
index 0000000..a95c0d3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+
+/**
+ * Interface used by Container Health Check Handlers.
+ */
+public interface HealthCheck {
+
+  /**
+   * Handle the container and apply the given check to it. This may result in
+   * commands getting generated to correct container states, or nothing
+   * happening if the container is healthy.
+   * @param request ContainerCheckRequest object representing the container
+   * @return True if the request was handled or false if it was not and should
+   *         be handled by the next handler in the chain.
+   */
+  boolean handle(ContainerCheckRequest request);
+
+  /**
+   * Starting from this HealthCheck, call the handle method. If it returns
+   * false, indicating the request was not handled, then forward the request to
+   * the next handler in the chain via its handleChain method. Repeating until
+   * the request is handled, or there are no further handlers to try.
+   * @param request
+   * @return True if the request was handled or false if not handler handled it.
+   */
+  boolean handleChain(ContainerCheckRequest request);
+
+  /**
+   * Add a subsequent HealthCheck that will be tried only if the current check
+   * returns false. This allows handlers to be chained together, and each will
+   * be tried in turn until one succeeds.
+   * @param handler
+   * @return
+   */
+  HealthCheck addNext(HealthCheck handler);
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java
new file mode 100644
index 0000000..666b1a2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.compareState;
+
+/**
+ * Class used in Replication Manager to check open container health for both
+ * EC and Ratis containers. Healthy open containers are skipped, while
+ * containers where some replicas are not in the same state as the container
+ * will be closed.
+ */
+public class OpenContainerHandler extends AbstractCheck {
+
+  private ReplicationManager replicationManager;
+
+  public OpenContainerHandler(ReplicationManager replicationManager) {
+    this.replicationManager = replicationManager;
+  }
+
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    ContainerInfo containerInfo = request.getContainerInfo();
+    if (containerInfo.getState() == HddsProtos.LifeCycleState.OPEN) {
+      if (!isOpenContainerHealthy(
+          containerInfo, request.getContainerReplicas())) {
+        // This is an unhealthy open container, so we need to trigger the
+        // close process on it.
+        request.getReport().incrementAndSample(
+            ReplicationManagerReport.HealthState.OPEN_UNHEALTHY,
+            containerInfo.containerID());
+        replicationManager.sendCloseContainerEvent(containerInfo.containerID());
+        return true;
+      }
+      // For open containers we do not want to do any further processing in RM
+      // so return true to stop the command chain.
+      return true;
+    }
+    // The container is not open, so we return false to let the next handler in
+    // the chain process it.
+    return false;
+  }
+
+  /**
+   * An open container is healthy if all its replicas are in the same state as
+   * the container.
+   * @param container The container to check
+   * @param replicas The replicas belonging to the container
+   * @return True if the container is healthy, false otherwise
+   */
+  private boolean isOpenContainerHealthy(
+      ContainerInfo container, Set< ContainerReplica > replicas) {
+    HddsProtos.LifeCycleState state = container.getState();
+    return replicas.stream()
+        .allMatch(r -> compareState(state, r.getState()));
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java
new file mode 100644
index 0000000..cf8396d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+/**
+ * HDDS Container replicaton check classes.
+ */
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
deleted file mode 100644
index d8060da..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
-import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
-import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
-import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
-import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
-
-/**
- * Tests for the ECContainerHealthCheck class.
- */
-public class TestECContainerHealthCheck {
-
-  private ECContainerHealthCheck healthCheck;
-  private ECReplicationConfig repConfig;
-
-  @Before
-  public void setup() {
-    healthCheck = new ECContainerHealthCheck();
-    repConfig = new ECReplicationConfig(3, 2);
-  }
-
-  @Test
-  public void testHealthyContainerIsHealthy() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas
-        = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
-    ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), 2);
-    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
-  }
-
-  @Test
-  public void testUnderReplicatedContainerIsUnderReplicated() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas
-        = createReplicas(container.containerID(), 1, 2, 4, 5);
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas,
-            Collections.emptyList(), 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(1, result.getRemainingRedundancy());
-    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertFalse(result.underReplicatedDueToDecommission());
-  }
-
-  @Test
-  public void testUnderReplicatedContainerFixedWithPending() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas
-        = createReplicas(container.containerID(), 1, 2, 4, 5);
-    List<ContainerReplicaOp> pending = new ArrayList<>();
-    pending.add(ContainerReplicaOp.create(
-        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending, 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(1, result.getRemainingRedundancy());
-    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertFalse(result.underReplicatedDueToDecommission());
-  }
-
-  @Test
-  public void testUnderReplicatedDueToDecommission() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
-        Pair.of(DECOMMISSIONED, 5));
-
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, Collections.emptyList(),
-            2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(2, result.getRemainingRedundancy());
-    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertTrue(result.underReplicatedDueToDecommission());
-  }
-
-  @Test
-  public void testUnderReplicatedDueToDecommissionFixedWithPending() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
-        Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
-    List<ContainerReplicaOp> pending = new ArrayList<>();
-    pending.add(ContainerReplicaOp.create(
-        ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
-
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending, 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(2, result.getRemainingRedundancy());
-    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertTrue(result.underReplicatedDueToDecommission());
-  }
-
-  @Test
-  public void testUnderReplicatedDueToDecommissionAndMissing() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
-    List<ContainerReplicaOp> pending = new ArrayList<>();
-    pending.add(ContainerReplicaOp.create(
-        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
-
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending, 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(1, result.getRemainingRedundancy());
-    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertFalse(result.underReplicatedDueToDecommission());
-  }
-
-  @Test
-  public void testUnderReplicatedAndUnrecoverable() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
-    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas,
-            Collections.emptyList(), 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(-1, result.getRemainingRedundancy());
-    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
-    Assert.assertFalse(result.underReplicatedDueToDecommission());
-    Assert.assertTrue(result.isUnrecoverable());
-  }
-
-  @Test
-  public void testOverReplicatedContainer() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
-        Pair.of(IN_SERVICE, 5),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
-    List<ContainerReplicaOp> pending = new ArrayList<>();
-    pending.add(ContainerReplicaOp.create(
-        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
-    pending.add(ContainerReplicaOp.create(
-        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
-
-    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending, 2);
-    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(2, result.getExcessRedundancy());
-    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
-  }
-
-  @Test
-  public void testOverReplicatedContainerDueToMaintenance() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
-        Pair.of(IN_SERVICE, 5),
-        Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
-
-    ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), 2);
-    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
-  }
-
-  @Test
-  public void testOverAndUnderReplicated() {
-    ContainerInfo container = createContainerInfo(repConfig);
-    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
-    ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), 2);
-    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
-    Assert.assertEquals(1,
-        ((UnderReplicatedHealthResult)result).getRemainingRedundancy());
-  }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 42db8ab..f96790a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -61,6 +62,7 @@
   private NodeManager nodeManager;
   private OzoneConfiguration conf;
   private PlacementPolicy policy;
+  private ECReplicationCheckHandler replicationCheck;
 
   @BeforeEach
   public void setup() {
@@ -80,6 +82,7 @@
     NodeSchema[] schemas =
         new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
+    replicationCheck = new ECReplicationCheckHandler();
   }
 
   @Test
@@ -126,7 +129,7 @@
       Set<ContainerReplica> availableReplicas,
       Map<Integer, Integer> index2excessNum) {
     ECOverReplicationHandler ecORH =
-        new ECOverReplicationHandler(policy, nodeManager);
+        new ECOverReplicationHandler(replicationCheck, policy, nodeManager);
     ContainerHealthResult.OverReplicatedHealthResult result =
         Mockito.mock(ContainerHealthResult.OverReplicatedHealthResult.class);
     Mockito.when(result.getContainerInfo()).thenReturn(container);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 87d4144..7c1c442 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
@@ -68,6 +69,7 @@
   private PlacementPolicy policy;
   private static final int DATA = 3;
   private static final int PARITY = 2;
+  private ECReplicationCheckHandler replicationCheck;
 
   @BeforeEach
   public void setup() {
@@ -87,6 +89,7 @@
     NodeSchema[] schemas =
         new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
+    replicationCheck = new ECReplicationCheckHandler();
   }
 
   @Test
@@ -260,7 +263,8 @@
       int decomIndexes, int maintenanceIndexes,
       PlacementPolicy placementPolicy) throws IOException {
     ECUnderReplicationHandler ecURH =
-        new ECUnderReplicationHandler(placementPolicy, conf, nodeManager);
+        new ECUnderReplicationHandler(replicationCheck,
+            placementPolicy, conf, nodeManager);
     ContainerHealthResult.UnderReplicatedHealthResult result =
         Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
     Mockito.when(result.isUnrecoverable()).thenReturn(false);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 82b49d5..6138176 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -139,10 +139,8 @@
         HddsProtos.LifeCycleState.OPEN);
     // It is under replicated, but as its still open it is seen as healthy.
     addReplicas(container, ContainerReplicaProto.State.OPEN, 1, 2, 3, 4);
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
-        result.getHealthState());
     Assert.assertEquals(0, underRep.size());
     Assert.assertEquals(0, overRep.size());
   }
@@ -154,10 +152,8 @@
         HddsProtos.LifeCycleState.OPEN);
     // Container is open but replicas are closed, so it is open but unhealthy.
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.UNHEALTHY,
-        result.getHealthState());
     Mockito.verify(eventPublisher, Mockito.times(1))
         .fireEvent(SCMEvents.CLOSE_CONTAINER, container.containerID());
     Assert.assertEquals(1, repReport.getStat(
@@ -172,10 +168,8 @@
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
 
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
-        result.getHealthState());
     Assert.assertEquals(0, underRep.size());
     Assert.assertEquals(0, overRep.size());
   }
@@ -186,10 +180,8 @@
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
 
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
-        result.getHealthState());
     Assert.assertEquals(1, underRep.size());
     Assert.assertEquals(0, overRep.size());
     Assert.assertEquals(1, repReport.getStat(
@@ -205,18 +197,14 @@
     containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
         MockDatanodeDetails.randomDatanodeDetails(), 5);
 
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
-        result.getHealthState());
     // As the pending replication fixes the under replication, nothing is added
     // to the under replication list.
     Assert.assertEquals(0, underRep.size());
     Assert.assertEquals(0, overRep.size());
-    Assert.assertTrue(((ContainerHealthResult.UnderReplicatedHealthResult)
-        result).isSufficientlyReplicatedAfterPending());
     // As the container is still under replicated, as the pending have not
-    // completed yet, the container is still marked as under-replciated in the
+    // completed yet, the container is still marked as under-replicated in the
     // report.
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -229,10 +217,8 @@
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2);
 
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
-        result.getHealthState());
     // If it is unrecoverable, there is no point in putting it into the under
     // replication list. It will be checked again on the next RM run.
     Assert.assertEquals(0, underRep.size());
@@ -250,13 +236,11 @@
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 5, 5);
 
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
     // If it is both under and over replicated, we set it to the most important
     // state, which is under-replicated. When that is fixed, over replication
     // will be handled.
-    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
-        result.getHealthState());
     Assert.assertEquals(1, underRep.size());
     Assert.assertEquals(0, overRep.size());
     Assert.assertEquals(1, repReport.getStat(
@@ -271,10 +255,8 @@
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED,
         1, 2, 3, 4, 5, 5);
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
-        result.getHealthState());
     Assert.assertEquals(0, underRep.size());
     Assert.assertEquals(1, overRep.size());
     Assert.assertEquals(1, repReport.getStat(
@@ -290,10 +272,8 @@
         1, 2, 3, 4, 5, 5);
     containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
         MockDatanodeDetails.randomDatanodeDetails(), 5);
-    ContainerHealthResult result = replicationManager.processContainer(
+    replicationManager.processContainer(
         container, underRep, overRep, repReport);
-    Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
-        result.getHealthState());
     Assert.assertEquals(0, underRep.size());
     // If the pending replication fixes the over-replication, nothing is added
     // to the over replication list.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java
new file mode 100644
index 0000000..5a1e399
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests for the ClosedWithMismatchedReplicasHandler.
+ */
+public class TestClosedWithMismatchedReplicasHandler {
+
+  private ReplicationManager replicationManager;
+  private ClosedWithMismatchedReplicasHandler handler;
+  private ECReplicationConfig replicationConfig;
+
+  @BeforeEach
+  public void setup() {
+    replicationConfig = new ECReplicationConfig(3, 2);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    handler = new ClosedWithMismatchedReplicasHandler(replicationManager);
+  }
+
+  @Test
+  public void testOpenContainerReturnsFalse() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, OPEN);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(Collections.emptySet())
+        .build();
+
+    Assertions.assertFalse(handler.handle(request));
+    Mockito.verify(replicationManager, times(0))
+        .sendCloseContainerReplicaCommand(
+            any(), any(), anyBoolean());
+  }
+
+  @Test
+  public void testClosedHealthyContainerReturnsFalse() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, CLOSED);
+    Set<ContainerReplica> containerReplicas =
+        ReplicationTestUtil.createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+    Assertions.assertFalse(handler.handle(request));
+
+    Mockito.verify(replicationManager, times(0))
+        .sendCloseContainerReplicaCommand(
+          any(), any(), anyBoolean());
+  }
+
+  @Test
+  public void testClosedMissMatchContainerReturnsTrue() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, CLOSED);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, 1, 2);
+    ContainerReplica mismatch1 = ReplicationTestUtil.createContainerReplica(
+        containerInfo.containerID(), 4,
+        HddsProtos.NodeOperationalState.IN_SERVICE,
+        ContainerReplicaProto.State.OPEN);
+    ContainerReplica mismatch2 = ReplicationTestUtil.createContainerReplica(
+        containerInfo.containerID(), 5,
+        HddsProtos.NodeOperationalState.IN_SERVICE,
+        ContainerReplicaProto.State.CLOSING);
+    ContainerReplica mismatch3 = ReplicationTestUtil.createContainerReplica(
+        containerInfo.containerID(), 3,
+        HddsProtos.NodeOperationalState.IN_SERVICE,
+        ContainerReplicaProto.State.UNHEALTHY);
+    containerReplicas.add(mismatch1);
+    containerReplicas.add(mismatch2);
+    containerReplicas.add(mismatch3);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+    Assertions.assertTrue(handler.handle(request));
+
+    Mockito.verify(replicationManager, times(1))
+        .sendCloseContainerReplicaCommand(
+            containerInfo, mismatch1.getDatanodeDetails(), true);
+    Mockito.verify(replicationManager, times(1))
+        .sendCloseContainerReplicaCommand(
+            containerInfo, mismatch2.getDatanodeDetails(), true);
+    Mockito.verify(replicationManager, times(0))
+        .sendCloseContainerReplicaCommand(
+            containerInfo, mismatch3.getDatanodeDetails(), true);
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
new file mode 100644
index 0000000..6222c23
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+
+/**
+ * Tests for the ECContainerHealthCheck class.
+ */
+public class TestECReplicationCheckHandler {
+
+  private ECReplicationCheckHandler healthCheck;
+  private ECReplicationConfig repConfig;
+  private Queue<UnderReplicatedHealthResult> underRepQueue;
+  private Queue<OverReplicatedHealthResult> overRepQueue;
+  private int maintenanceRedundancy = 2;
+  private ContainerCheckRequest.Builder requestBuilder;
+  private ReplicationManagerReport report;
+
+
+  @Before
+  public void setup() {
+    healthCheck = new ECReplicationCheckHandler();
+    repConfig = new ECReplicationConfig(3, 2);
+    underRepQueue = new LinkedList<>();
+    overRepQueue = new LinkedList<>();
+    report = new ReplicationManagerReport();
+    requestBuilder = new ContainerCheckRequest.Builder()
+        .setOverRepQueue(overRepQueue)
+        .setUnderRepQueue(underRepQueue)
+        .setMaintenanceRedundancy(maintenanceRedundancy)
+        .setPendingOps(Collections.emptyList())
+        .setReport(report);
+  }
+
+  @Test
+  public void testHealthyContainerIsHealthy() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    ContainerHealthResult result = healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+    Assert.assertFalse(healthCheck.handle(request));
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+  }
+
+  @Test
+  public void testUnderReplicatedContainerIsUnderReplicated() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 1, 2, 4, 5);
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(1, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedContainerFixedWithPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 1, 2, 4, 5);
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    // Fixed with pending so nothing added to the queue
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    // Still under replicated until the pending complete
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommission() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
+        Pair.of(DECOMMISSIONED, 5));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(1, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    // Still under replicated until the pending complete
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommissionFixedWithPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
+        Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    // Fixed with pending so nothing added to the queue
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    // Still under replicated until the pending complete
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommissionAndMissingReplica() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
+
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(1, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedAndUnrecoverable() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(-1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isUnrecoverable());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    // Unrecoverable so not added to the queue
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MISSING));
+  }
+
+  @Test
+  public void testOverReplicatedContainer() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getExcessRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(1, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverReplicatedContainerFixedByPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+
+    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+        healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getExcessRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverReplicatedContainerDueToMaintenance() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5),
+        Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+    ContainerHealthResult result = healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+    // As it is maintenance replicas causing the over replication, the container
+    // is not really over-replicated.
+    Assert.assertFalse(healthCheck.handle(request));
+    Assert.assertEquals(0, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverAndUnderReplicated() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+    ContainerHealthResult result = healthCheck.checkHealth(request);
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1,
+        ((UnderReplicatedHealthResult)result).getRemainingRedundancy());
+
+    // Under-replicated takes precedence and the over-replication is ignored
+    // for now.
+    Assert.assertTrue(healthCheck.handle(request));
+    Assert.assertEquals(1, underRepQueue.size());
+    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java
new file mode 100644
index 0000000..1acb4ec
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests for the OpenContainerHandler class.
+ */
+public class TestOpenContainerHandler {
+
+  private ReplicationManager replicationManager;
+  private OpenContainerHandler openContainerHandler;
+  private ECReplicationConfig replicationConfig;
+
+  @BeforeEach
+  public void setup() {
+    replicationConfig = new ECReplicationConfig(3, 2);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    openContainerHandler = new OpenContainerHandler(replicationManager);
+  }
+
+  @Test
+  public void testClosedContainerReturnsTrue() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, CLOSED);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+    Assertions.assertFalse(openContainerHandler.handle(request));
+    Mockito.verify(replicationManager, times(0))
+        .sendCloseContainerEvent(Mockito.any());
+  }
+
+  @Test
+  public void testOpenContainerReturnsFalse() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, OPEN);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.OPEN, 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+    Assertions.assertTrue(openContainerHandler.handle(request));
+    Mockito.verify(replicationManager, times(0))
+        .sendCloseContainerEvent(Mockito.any());
+  }
+
+  @Test
+  public void testOpenUnhealthyContainerIsClosed() {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        replicationConfig, 1, OPEN);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.EMPTY_LIST)
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+    Assertions.assertTrue(openContainerHandler.handle(request));
+    Mockito.verify(replicationManager, times(1))
+        .sendCloseContainerEvent(containerInfo.containerID());
+  }
+
+}