HDDS-6975. EC: Define the value of Maintenance Redundancy for EC containers (#3723)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 7ef57a5..3c1176c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -173,7 +173,7 @@
         ", sequenceId=" + sequenceId +
         ", keyCount=" + keyCount +
         ", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ?
-        ",replicaIndex= " + replicaIndex :
+        ",replicaIndex=" + replicaIndex :
         "") +
         '}';
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 5b8d6da..e3c3672 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -152,6 +152,7 @@
       overRepQueue;
   private final ECUnderReplicationHandler ecUnderReplicationHandler;
   private final ECOverReplicationHandler ecOverReplicationHandler;
+  private final int maintenanceRedundancy;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -190,6 +191,7 @@
     this.nodeManager = nodeManager;
     this.underRepQueue = createUnderReplicatedQueue();
     this.overRepQueue = new LinkedList<>();
+    this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
         containerPlacement, conf, nodeManager);
     ecOverReplicationHandler =
@@ -370,7 +372,7 @@
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
     return ecUnderReplicationHandler.processAndCreateCommands(replicas,
-        pendingOps, result, 0);
+        pendingOps, result, maintenanceRedundancy);
   }
 
   public Map<DatanodeDetails, SCMCommand<?>> processOverReplicatedContainer(
@@ -381,7 +383,7 @@
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
     return ecOverReplicationHandler.processAndCreateCommands(replicas,
-        pendingOps, result, 0);
+        pendingOps, result, maintenanceRedundancy);
   }
 
   public long getScmTerm() throws NotLeaderException {
@@ -419,8 +421,8 @@
 
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
-    ContainerHealthResult health = ecContainerHealthCheck
-        .checkHealth(containerInfo, replicas, pendingOps, 0);
+    ContainerHealthResult health = ecContainerHealthCheck.checkHealth(
+        containerInfo, replicas, pendingOps, maintenanceRedundancy);
       // TODO - should the report have a HEALTHY state, rather than just bad
       //        states? It would need to be added to legacy RM too.
     if (health.getHealthState()
@@ -686,6 +688,41 @@
       this.maintenanceReplicaMinimum = replicaCount;
     }
 
+    /**
+     * Defines how many redundant replicas of a container must be online for a
+     * node to enter maintenance. Currently, only used for EC containers. We
+     * need to consider removing the "maintenance.replica.minimum" setting
+     * and having both Ratis and EC use this new one.
+     */
+    @Config(key = "maintenance.remaining.redundancy",
+        type = ConfigType.INT,
+        defaultValue = "1",
+        tags = {SCM, OZONE},
+        description = "The number of redundant containers in a group which" +
+            " must be available for a node to enter maintenance. If putting" +
+            " a node into maintenance reduces the redundancy below this value" +
+            " , the node will remain in the ENTERING_MAINTENANCE state until" +
+            " a new replica is created. For Ratis containers, the default" +
+            " value of 1 ensures at least two replicas are online, meaning 1" +
+            " more can be lost without data becoming unavailable. For any EC" +
+            " container it will have at least dataNum + 1 online, allowing" +
+            " the loss of 1 more replica before data becomes unavailable." +
+            " Currently only EC containers use this setting. Ratis containers" +
+            " use hdds.scm.replication.maintenance.replica.minimum. For EC," +
+            " if nodes are in maintenance, it is likely reconstruction reads" +
+            " will be required if some of the data replicas are offline. This" +
+            " is seamless to the client, but will affect read performance."
+    )
+    private int maintenanceRemainingRedundancy = 1;
+
+    public void setMaintenanceRemainingRedundancy(int redundancy) {
+      this.maintenanceRemainingRedundancy = redundancy;
+    }
+
+    public int getMaintenanceRemainingRedundancy() {
+      return maintenanceRemainingRedundancy;
+    }
+
     public long getInterval() {
       return interval;
     }
@@ -694,6 +731,14 @@
       return underReplicatedInterval;
     }
 
+    public void setUnderReplicatedInterval(Duration duration) {
+      this.underReplicatedInterval = duration.toMillis();
+    }
+
+    public void setOverReplicatedInterval(Duration duration) {
+      this.overReplicatedInterval = duration.toMillis();
+    }
+
     public long getOverReplicatedInterval() {
       return overReplicatedInterval;
     }
@@ -796,8 +841,8 @@
         containerInfo.containerID());
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
-    // TODO: define maintenance redundancy for EC (HDDS-6975)
-    return new ECContainerReplicaCount(containerInfo, replicas, pendingOps, 0);
+    return new ECContainerReplicaCount(
+        containerInfo, replicas, pendingOps, maintenanceRedundancy);
   }
 
   /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index eb8f69a..1e7a8b4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.Scanner;
 
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.StorageType;
@@ -99,8 +100,16 @@
   public static void createKey(OzoneBucket bucket, String keyName,
       ReplicationFactor repFactor, ReplicationType repType, String content)
       throws IOException {
+    ReplicationConfig repConfig = ReplicationConfig
+        .fromTypeAndFactor(repType, repFactor);
+    createKey(bucket, keyName, repConfig, content);
+  }
+
+  public static void createKey(OzoneBucket bucket, String keyName,
+      ReplicationConfig repConfig, String content)
+      throws IOException {
     try (OutputStream stream = bucket
-        .createKey(keyName, content.length(), repType, repFactor,
+        .createKey(keyName, content.length(), repConfig,
             new HashMap<>())) {
       stream.write(content.getBytes(UTF_8));
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index b2c4336..9d83c56 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -57,13 +59,14 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
@@ -90,9 +93,13 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDecommissionAndMaintenance.class);
 
-  private static int numOfDatanodes = 6;
+  private static int numOfDatanodes = 7;
   private static String bucketName = "bucket1";
   private static String volName = "vol1";
+  private static RatisReplicationConfig ratisRepConfig =
+      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+  private static ECReplicationConfig ecRepConfig =
+      new ECReplicationConfig(3, 2);
   private OzoneBucket bucket;
   private MiniOzoneCluster cluster;
   private NodeManager nm;
@@ -121,16 +128,25 @@
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
     conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
         1, SECONDS);
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT,
+        10, SECONDS);
+    conf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+        1, SECONDS);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        0, SECONDS);
 
     ReplicationManagerConfiguration replicationConf =
         conf.getObject(ReplicationManagerConfiguration.class);
     replicationConf.setInterval(Duration.ofSeconds(1));
+    replicationConf.setUnderReplicatedInterval(Duration.ofSeconds(1));
+    replicationConf.setOverReplicatedInterval(Duration.ofSeconds(1));
     conf.setFromObject(replicationConf);
 
     MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numOfDatanodes);
 
-    clusterProvider = new MiniOzoneClusterProvider(conf, builder, 8);
+    clusterProvider = new MiniOzoneClusterProvider(conf, builder, 7);
   }
 
   @AfterAll
@@ -163,15 +179,28 @@
   public void testNodeWithOpenPipelineCanBeDecommissionedAndRecommissioned()
       throws Exception {
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    generateData(20, "key", ratisRepConfig);
+    generateData(20, "ecKey", ecRepConfig);
 
     // Locate any container and find its open pipeline
-    final ContainerInfo container = waitForAndReturnContainer();
+    final ContainerInfo container =
+        waitForAndReturnContainer(ratisRepConfig, 3);
+    final ContainerInfo ecContainer = waitForAndReturnContainer(ecRepConfig, 5);
     Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+    Pipeline ecPipeline = pm.getPipeline(ecContainer.getPipelineID());
     assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
-    Set<ContainerReplica> replicas = getContainerReplicas(container);
+    assertEquals(Pipeline.PipelineState.OPEN, ecPipeline.getPipelineState());
+    // Find a DN to decommission that is in both the EC and Ratis pipeline.
+    // There are 7 DNs. The EC pipeline must have 5 and the Ratis must have 3
+    // so there must be an intersecting DN in both lists.
+    // Once we have a DN id, look it up in the NM, as the datanodeDetails
+    // instance in the pipeline may not be the same as the one stored in the
+    // NM.
+    final UUID dnID = pipeline.getNodes().stream()
+        .filter(node -> ecPipeline.getNodes().contains(node))
+        .findFirst().get().getUuid();
+    final DatanodeDetails toDecommission = nm.getNodeByUuid(dnID.toString());
 
-    final DatanodeDetails toDecommission = getOneDNHostingReplica(replicas);
     scmClient.decommissionNodes(Arrays.asList(
         getDNHostAndPort(toDecommission)));
 
@@ -185,6 +214,8 @@
     // Should now be 4 replicas online as the DN is still alive but
     // in the DECOMMISSIONED state.
     waitForContainerReplicas(container, 4);
+    // In the EC case, there should be 6 online
+    waitForContainerReplicas(ecContainer, 6);
 
     // Stop the decommissioned DN
     int dnIndex = cluster.getHddsDatanodeIndex(toDecommission);
@@ -194,6 +225,8 @@
     // Now the decommissioned node is dead, we should have
     // 3 replicas for the tracked container.
     waitForContainerReplicas(container, 3);
+    // In the EC case, there should be 5 online
+    waitForContainerReplicas(ecContainer, 5);
 
     cluster.restartHddsDatanode(dnIndex, true);
     scmClient.recommissionNodes(Arrays.asList(
@@ -207,6 +240,8 @@
   // when it re-registers it should re-enter the decommission workflow and
   // complete decommissioning. If SCM is restarted after decommssion is complete
   // then SCM should learn of the decommissioned DN when it registers.
+  // If the DN is then stopped and recommissioned, when its state should
+  // move to IN_SERVICE when it is restarted.
   public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
       throws Exception {
     // First stop the replicationManager so nodes marked for decommission cannot
@@ -215,8 +250,10 @@
     // Generate some data and then pick a DN to decommission which is hosting a
     // container. This ensures it will not decommission immediately due to
     // having no containers.
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
-    final ContainerInfo container = waitForAndReturnContainer();
+    generateData(20, "key", ratisRepConfig);
+    generateData(20, "ecKey", ecRepConfig);
+    final ContainerInfo container =
+        waitForAndReturnContainer(ratisRepConfig, 3);
     final DatanodeDetails dn
         = getOneDNHostingReplica(getContainerReplicas(container));
     scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
@@ -244,40 +281,24 @@
     // Also confirm the datanodeDetails correctly reflect the operational
     // state.
     waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
-  }
 
-  @Test
-  // If a node was decommissioned, and then stopped so it is dead. Then it is
-  // recommissioned in SCM and restarted, the SCM state should be taken as the
-  // source of truth and the node will go to the IN_SERVICE state and the state
-  // should be updated on the DN.
-  public void testStoppedDecommissionedNodeTakesSCMStateOnRestart()
-      throws Exception {
-    // Decommission node and wait for it to be DECOMMISSIONED
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
-
-    DatanodeDetails dn = nm.getAllNodes().get(0);
-    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
-    waitForDnToReachOpState(dn, DECOMMISSIONED);
-    waitForDnToReachPersistedOpState(dn, DECOMMISSIONED);
-
+    // Now stop the DN and recommission it in SCM. When it restarts, it should
+    // reflect the state of in SCM, in IN_SERVICE.
     int dnIndex = cluster.getHddsDatanodeIndex(dn);
     cluster.shutdownHddsDatanode(dnIndex);
     waitForDnToReachHealthState(dn, DEAD);
-
     // Datanode is shutdown and dead. Now recommission it in SCM
     scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
-
     // Now restart it and ensure it remains IN_SERVICE
     cluster.restartHddsDatanode(dnIndex, true);
-    DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+    newDn = nm.getNodeByUuid(dn.getUuid().toString());
 
     // As this is not an initial registration since SCM was started, the DN
     // should report its operational state and if it differs from what SCM
     // has, then the SCM state should be used and the DN state updated.
     waitForDnToReachHealthState(newDn, HEALTHY);
     waitForDnToReachOpState(newDn, IN_SERVICE);
-    waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+    waitForDnToReachPersistedOpState(newDn, IN_SERVICE);
   }
 
   @Test
@@ -290,15 +311,29 @@
   public void testSingleNodeWithOpenPipelineCanGotoMaintenance()
       throws Exception {
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    generateData(20, "key", ratisRepConfig);
+    generateData(20, "ecKey", ecRepConfig);
 
     // Locate any container and find its open pipeline
-    final ContainerInfo container = waitForAndReturnContainer();
+    final ContainerInfo container =
+        waitForAndReturnContainer(ratisRepConfig, 3);
+    final ContainerInfo ecContainer =
+        waitForAndReturnContainer(ecRepConfig, 5);
     Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+    Pipeline ecPipeline = pm.getPipeline(ecContainer.getPipelineID());
     assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
-    Set<ContainerReplica> replicas = getContainerReplicas(container);
+    assertEquals(Pipeline.PipelineState.OPEN, ecPipeline.getPipelineState());
+    // Find a DN to decommission that is in both the EC and Ratis pipeline.
+    // There are 7 DNs. The EC pipeline must have 5 and the Ratis must have 3
+    // so there must be an intersecting DN in both lists.
+    // Once we have a DN id, look it up in the NM, as the datanodeDetails
+    // instance in the pipeline may not be the same as the one stored in the
+    // NM.
+    final UUID dnID = pipeline.getNodes().stream()
+        .filter(node -> ecPipeline.getNodes().contains(node))
+        .findFirst().get().getUuid();
+    final DatanodeDetails dn = nm.getNodeByUuid(dnID.toString());
 
-    final DatanodeDetails dn = getOneDNHostingReplica(replicas);
     scmClient.startMaintenanceNodes(Arrays.asList(
         getDNHostAndPort(dn)), 0);
 
@@ -309,7 +344,10 @@
     // maintenance
     Set<ContainerReplica> newReplicas =
         cm.getContainerReplicas(container.containerID());
+    Set<ContainerReplica> ecReplicas =
+        cm.getContainerReplicas(ecContainer.containerID());
     assertEquals(3, newReplicas.size());
+    assertEquals(5, ecReplicas.size());
 
     // Stop the maintenance DN
     cluster.shutdownHddsDatanode(dn);
@@ -318,7 +356,9 @@
     // Now the maintenance node is dead, we should still have
     // 3 replicas as we don't purge the replicas for a dead maintenance node
     newReplicas = cm.getContainerReplicas(container.containerID());
+    ecReplicas = cm.getContainerReplicas(ecContainer.containerID());
     assertEquals(3, newReplicas.size());
+    assertEquals(5, ecReplicas.size());
 
     // Restart the DN and it should keep the IN_MAINTENANCE state
     cluster.restartHddsDatanode(dn, true);
@@ -347,24 +387,25 @@
   }
 
   @Test
-  // By default a node can enter maintenance if there are two replicas left
-  // available when the maintenance nodes are stopped. Therefore putting all
+  // By default, a node can enter maintenance if there are two replicas left
+  // available when the maintenance nodes are stopped. Therefore, putting all
   // nodes hosting a replica to maintenance should cause new replicas to get
   // created before the nodes can enter maintenance. When the maintenance nodes
   // return, the excess replicas should be removed.
   public void testContainerIsReplicatedWhenAllNodesGotoMaintenance()
       throws Exception {
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    generateData(20, "key", ratisRepConfig);
     // Locate any container and find its open pipeline
-    final ContainerInfo container = waitForAndReturnContainer();
+    final ContainerInfo container =
+        waitForAndReturnContainer(ratisRepConfig, 3);
     Set<ContainerReplica> replicas = getContainerReplicas(container);
 
     List<DatanodeDetails> forMaintenance = new ArrayList<>();
     replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
 
     scmClient.startMaintenanceNodes(forMaintenance.stream()
-        .map(d -> getDNHostAndPort(d))
+        .map(this::getDNHostAndPort)
         .collect(Collectors.toList()), 0);
 
     // Ensure all 3 DNs go to maintenance
@@ -385,8 +426,32 @@
     for (DatanodeDetails dn : forMaintenance) {
       waitForDnToReachOpState(dn, IN_SERVICE);
     }
-
     waitForContainerReplicas(container, 3);
+
+    // Now write some EC data and put two nodes into maintenance. This should
+    // result in at least 1 extra replica getting created.
+    generateData(20, "eckey", ecRepConfig);
+    final ContainerInfo ecContainer =
+        waitForAndReturnContainer(ecRepConfig, 5);
+    List<DatanodeDetails> ecMaintenance = replicas.stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .limit(2)
+        .collect(Collectors.toList());
+    scmClient.startMaintenanceNodes(ecMaintenance.stream()
+        .map(this::getDNHostAndPort)
+        .collect(Collectors.toList()), 0);
+    for (DatanodeDetails dn : ecMaintenance) {
+      waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
+    }
+    assertTrue(cm.getContainerReplicas(ecContainer.containerID()).size() >= 6);
+    scmClient.recommissionNodes(forMaintenance.stream()
+        .map(this::getDNHostAndPort)
+        .collect(Collectors.toList()));
+    // Ensure the 2 DNs go to IN_SERVICE
+    for (DatanodeDetails dn : ecMaintenance) {
+      waitForDnToReachOpState(dn, IN_SERVICE);
+    }
+    waitForContainerReplicas(ecContainer, 5);
   }
 
   @Test
@@ -397,16 +462,17 @@
     // Stop Replication Manager to sure no containers are replicated
     stopReplicationManager();
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    generateData(20, "key", ratisRepConfig);
     // Locate any container and find its open pipeline
-    final ContainerInfo container = waitForAndReturnContainer();
+    final ContainerInfo container =
+        waitForAndReturnContainer(ratisRepConfig, 3);
     Set<ContainerReplica> replicas = getContainerReplicas(container);
 
     List<DatanodeDetails> forMaintenance = new ArrayList<>();
     replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
 
     scmClient.startMaintenanceNodes(forMaintenance.stream()
-        .map(d -> getDNHostAndPort(d))
+        .map(this::getDNHostAndPort)
         .collect(Collectors.toList()), 0);
 
     // Ensure all 3 DNs go to entering_maintenance
@@ -440,8 +506,8 @@
   public void testMaintenanceEndsAutomaticallyAtTimeout()
       throws Exception {
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
-    ContainerInfo container = waitForAndReturnContainer();
+    generateData(20, "key", ratisRepConfig);
+    ContainerInfo container = waitForAndReturnContainer(ratisRepConfig, 3);
     DatanodeDetails dn =
         getOneDNHostingReplica(getContainerReplicas(container));
 
@@ -482,8 +548,8 @@
   public void testSCMHandlesRestartForMaintenanceNode()
       throws Exception {
     // Generate some data on the empty cluster to create some containers
-    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
-    ContainerInfo container = waitForAndReturnContainer();
+    generateData(20, "key", ratisRepConfig);
+    ContainerInfo container = waitForAndReturnContainer(ratisRepConfig, 3);
     DatanodeDetails dn =
         getOneDNHostingReplica(getContainerReplicas(container));
 
@@ -547,14 +613,13 @@
    * Generates some data on the cluster so the cluster has some containers.
    * @param keyCount The number of keys to create
    * @param keyPrefix The prefix to use for the key name.
-   * @param repFactor The replication Factor for the keys
-   * @param repType The replication Type for the keys
+   * @param replicationConfig The replication config for the keys
    * @throws IOException
    */
   private void generateData(int keyCount, String keyPrefix,
-      ReplicationFactor repFactor, ReplicationType repType) throws IOException {
+      ReplicationConfig replicationConfig) throws IOException {
     for (int i = 0; i < keyCount; i++) {
-      TestDataUtil.createKey(bucket, keyPrefix + i, repFactor, repType,
+      TestDataUtil.createKey(bucket, keyPrefix + i, replicationConfig,
           "this is the content");
     }
   }
@@ -667,10 +732,18 @@
    * @return A single container present on the cluster
    * @throws Exception
    */
-  private ContainerInfo waitForAndReturnContainer() throws Exception {
-    final ContainerInfo container = cm.getContainers().get(0);
-    // Ensure all 3 replicas of the container have been reported via ICR
-    waitForContainerReplicas(container, 3);
+  private ContainerInfo waitForAndReturnContainer(ReplicationConfig repConfig,
+      int expectedReplicas) throws Exception {
+    List<ContainerInfo> containers = cm.getContainers();
+    ContainerInfo container = null;
+    for (ContainerInfo c : containers) {
+      if (c.getReplicationConfig().equals(repConfig)) {
+        container = c;
+        break;
+      }
+    }
+    // Ensure expected replicas of the container have been reported via ICR
+    waitForContainerReplicas(container, expectedReplicas);
     return container;
   }