HDDS-6975. EC: Define the value of Maintenance Redundancy for EC containers (#3723)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 7ef57a5..3c1176c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -173,7 +173,7 @@
", sequenceId=" + sequenceId +
", keyCount=" + keyCount +
", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ?
- ",replicaIndex= " + replicaIndex :
+ ",replicaIndex=" + replicaIndex :
"") +
'}';
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 5b8d6da..e3c3672 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -152,6 +152,7 @@
overRepQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
+ private final int maintenanceRedundancy;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -190,6 +191,7 @@
this.nodeManager = nodeManager;
this.underRepQueue = createUnderReplicatedQueue();
this.overRepQueue = new LinkedList<>();
+ this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
ecUnderReplicationHandler = new ECUnderReplicationHandler(
containerPlacement, conf, nodeManager);
ecOverReplicationHandler =
@@ -370,7 +372,7 @@
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
return ecUnderReplicationHandler.processAndCreateCommands(replicas,
- pendingOps, result, 0);
+ pendingOps, result, maintenanceRedundancy);
}
public Map<DatanodeDetails, SCMCommand<?>> processOverReplicatedContainer(
@@ -381,7 +383,7 @@
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
return ecOverReplicationHandler.processAndCreateCommands(replicas,
- pendingOps, result, 0);
+ pendingOps, result, maintenanceRedundancy);
}
public long getScmTerm() throws NotLeaderException {
@@ -419,8 +421,8 @@
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
- ContainerHealthResult health = ecContainerHealthCheck
- .checkHealth(containerInfo, replicas, pendingOps, 0);
+ ContainerHealthResult health = ecContainerHealthCheck.checkHealth(
+ containerInfo, replicas, pendingOps, maintenanceRedundancy);
// TODO - should the report have a HEALTHY state, rather than just bad
// states? It would need to be added to legacy RM too.
if (health.getHealthState()
@@ -686,6 +688,41 @@
this.maintenanceReplicaMinimum = replicaCount;
}
+ /**
+ * Defines how many redundant replicas of a container must be online for a
+ * node to enter maintenance. Currently, only used for EC containers. We
+ * need to consider removing the "maintenance.replica.minimum" setting
+ * and having both Ratis and EC use this new one.
+ */
+ @Config(key = "maintenance.remaining.redundancy",
+ type = ConfigType.INT,
+ defaultValue = "1",
+ tags = {SCM, OZONE},
+ description = "The number of redundant containers in a group which" +
+ " must be available for a node to enter maintenance. If putting" +
+ " a node into maintenance reduces the redundancy below this value" +
+ " , the node will remain in the ENTERING_MAINTENANCE state until" +
+ " a new replica is created. For Ratis containers, the default" +
+ " value of 1 ensures at least two replicas are online, meaning 1" +
+ " more can be lost without data becoming unavailable. For any EC" +
+ " container it will have at least dataNum + 1 online, allowing" +
+ " the loss of 1 more replica before data becomes unavailable." +
+ " Currently only EC containers use this setting. Ratis containers" +
+ " use hdds.scm.replication.maintenance.replica.minimum. For EC," +
+ " if nodes are in maintenance, it is likely reconstruction reads" +
+ " will be required if some of the data replicas are offline. This" +
+ " is seamless to the client, but will affect read performance."
+ )
+ private int maintenanceRemainingRedundancy = 1;
+
+ public void setMaintenanceRemainingRedundancy(int redundancy) {
+ this.maintenanceRemainingRedundancy = redundancy;
+ }
+
+ public int getMaintenanceRemainingRedundancy() {
+ return maintenanceRemainingRedundancy;
+ }
+
public long getInterval() {
return interval;
}
@@ -694,6 +731,14 @@
return underReplicatedInterval;
}
+ public void setUnderReplicatedInterval(Duration duration) {
+ this.underReplicatedInterval = duration.toMillis();
+ }
+
+ public void setOverReplicatedInterval(Duration duration) {
+ this.overReplicatedInterval = duration.toMillis();
+ }
+
public long getOverReplicatedInterval() {
return overReplicatedInterval;
}
@@ -796,8 +841,8 @@
containerInfo.containerID());
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
- // TODO: define maintenance redundancy for EC (HDDS-6975)
- return new ECContainerReplicaCount(containerInfo, replicas, pendingOps, 0);
+ return new ECContainerReplicaCount(
+ containerInfo, replicas, pendingOps, maintenanceRedundancy);
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index eb8f69a..1e7a8b4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Scanner;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.StorageType;
@@ -99,8 +100,16 @@
public static void createKey(OzoneBucket bucket, String keyName,
ReplicationFactor repFactor, ReplicationType repType, String content)
throws IOException {
+ ReplicationConfig repConfig = ReplicationConfig
+ .fromTypeAndFactor(repType, repFactor);
+ createKey(bucket, keyName, repConfig, content);
+ }
+
+ public static void createKey(OzoneBucket bucket, String keyName,
+ ReplicationConfig repConfig, String content)
+ throws IOException {
try (OutputStream stream = bucket
- .createKey(keyName, content.length(), repType, repFactor,
+ .createKey(keyName, content.length(), repConfig,
new HashMap<>())) {
stream.write(content.getBytes(UTF_8));
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index b2c4336..9d83c56 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.ozone.scm.node;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -57,13 +59,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
@@ -90,9 +93,13 @@
private static final Logger LOG =
LoggerFactory.getLogger(TestDecommissionAndMaintenance.class);
- private static int numOfDatanodes = 6;
+ private static int numOfDatanodes = 7;
private static String bucketName = "bucket1";
private static String volName = "vol1";
+ private static RatisReplicationConfig ratisRepConfig =
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private static ECReplicationConfig ecRepConfig =
+ new ECReplicationConfig(3, 2);
private OzoneBucket bucket;
private MiniOzoneCluster cluster;
private NodeManager nm;
@@ -121,16 +128,25 @@
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
1, SECONDS);
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT,
+ 10, SECONDS);
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+ 1, SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ 0, SECONDS);
ReplicationManagerConfiguration replicationConf =
conf.getObject(ReplicationManagerConfiguration.class);
replicationConf.setInterval(Duration.ofSeconds(1));
+ replicationConf.setUnderReplicatedInterval(Duration.ofSeconds(1));
+ replicationConf.setOverReplicatedInterval(Duration.ofSeconds(1));
conf.setFromObject(replicationConf);
MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes);
- clusterProvider = new MiniOzoneClusterProvider(conf, builder, 8);
+ clusterProvider = new MiniOzoneClusterProvider(conf, builder, 7);
}
@AfterAll
@@ -163,15 +179,28 @@
public void testNodeWithOpenPipelineCanBeDecommissionedAndRecommissioned()
throws Exception {
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ generateData(20, "key", ratisRepConfig);
+ generateData(20, "ecKey", ecRepConfig);
// Locate any container and find its open pipeline
- final ContainerInfo container = waitForAndReturnContainer();
+ final ContainerInfo container =
+ waitForAndReturnContainer(ratisRepConfig, 3);
+ final ContainerInfo ecContainer = waitForAndReturnContainer(ecRepConfig, 5);
Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+ Pipeline ecPipeline = pm.getPipeline(ecContainer.getPipelineID());
assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
- Set<ContainerReplica> replicas = getContainerReplicas(container);
+ assertEquals(Pipeline.PipelineState.OPEN, ecPipeline.getPipelineState());
+ // Find a DN to decommission that is in both the EC and Ratis pipeline.
+ // There are 7 DNs. The EC pipeline must have 5 and the Ratis must have 3
+ // so there must be an intersecting DN in both lists.
+ // Once we have a DN id, look it up in the NM, as the datanodeDetails
+ // instance in the pipeline may not be the same as the one stored in the
+ // NM.
+ final UUID dnID = pipeline.getNodes().stream()
+ .filter(node -> ecPipeline.getNodes().contains(node))
+ .findFirst().get().getUuid();
+ final DatanodeDetails toDecommission = nm.getNodeByUuid(dnID.toString());
- final DatanodeDetails toDecommission = getOneDNHostingReplica(replicas);
scmClient.decommissionNodes(Arrays.asList(
getDNHostAndPort(toDecommission)));
@@ -185,6 +214,8 @@
// Should now be 4 replicas online as the DN is still alive but
// in the DECOMMISSIONED state.
waitForContainerReplicas(container, 4);
+ // In the EC case, there should be 6 online
+ waitForContainerReplicas(ecContainer, 6);
// Stop the decommissioned DN
int dnIndex = cluster.getHddsDatanodeIndex(toDecommission);
@@ -194,6 +225,8 @@
// Now the decommissioned node is dead, we should have
// 3 replicas for the tracked container.
waitForContainerReplicas(container, 3);
+ // In the EC case, there should be 5 online
+ waitForContainerReplicas(ecContainer, 5);
cluster.restartHddsDatanode(dnIndex, true);
scmClient.recommissionNodes(Arrays.asList(
@@ -207,6 +240,8 @@
// when it re-registers it should re-enter the decommission workflow and
// complete decommissioning. If SCM is restarted after decommssion is complete
// then SCM should learn of the decommissioned DN when it registers.
+ // If the DN is then stopped and recommissioned, when its state should
+ // move to IN_SERVICE when it is restarted.
public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
throws Exception {
// First stop the replicationManager so nodes marked for decommission cannot
@@ -215,8 +250,10 @@
// Generate some data and then pick a DN to decommission which is hosting a
// container. This ensures it will not decommission immediately due to
// having no containers.
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
- final ContainerInfo container = waitForAndReturnContainer();
+ generateData(20, "key", ratisRepConfig);
+ generateData(20, "ecKey", ecRepConfig);
+ final ContainerInfo container =
+ waitForAndReturnContainer(ratisRepConfig, 3);
final DatanodeDetails dn
= getOneDNHostingReplica(getContainerReplicas(container));
scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
@@ -244,40 +281,24 @@
// Also confirm the datanodeDetails correctly reflect the operational
// state.
waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
- }
- @Test
- // If a node was decommissioned, and then stopped so it is dead. Then it is
- // recommissioned in SCM and restarted, the SCM state should be taken as the
- // source of truth and the node will go to the IN_SERVICE state and the state
- // should be updated on the DN.
- public void testStoppedDecommissionedNodeTakesSCMStateOnRestart()
- throws Exception {
- // Decommission node and wait for it to be DECOMMISSIONED
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
-
- DatanodeDetails dn = nm.getAllNodes().get(0);
- scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
- waitForDnToReachOpState(dn, DECOMMISSIONED);
- waitForDnToReachPersistedOpState(dn, DECOMMISSIONED);
-
+ // Now stop the DN and recommission it in SCM. When it restarts, it should
+ // reflect the state of in SCM, in IN_SERVICE.
int dnIndex = cluster.getHddsDatanodeIndex(dn);
cluster.shutdownHddsDatanode(dnIndex);
waitForDnToReachHealthState(dn, DEAD);
-
// Datanode is shutdown and dead. Now recommission it in SCM
scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
-
// Now restart it and ensure it remains IN_SERVICE
cluster.restartHddsDatanode(dnIndex, true);
- DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+ newDn = nm.getNodeByUuid(dn.getUuid().toString());
// As this is not an initial registration since SCM was started, the DN
// should report its operational state and if it differs from what SCM
// has, then the SCM state should be used and the DN state updated.
waitForDnToReachHealthState(newDn, HEALTHY);
waitForDnToReachOpState(newDn, IN_SERVICE);
- waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+ waitForDnToReachPersistedOpState(newDn, IN_SERVICE);
}
@Test
@@ -290,15 +311,29 @@
public void testSingleNodeWithOpenPipelineCanGotoMaintenance()
throws Exception {
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ generateData(20, "key", ratisRepConfig);
+ generateData(20, "ecKey", ecRepConfig);
// Locate any container and find its open pipeline
- final ContainerInfo container = waitForAndReturnContainer();
+ final ContainerInfo container =
+ waitForAndReturnContainer(ratisRepConfig, 3);
+ final ContainerInfo ecContainer =
+ waitForAndReturnContainer(ecRepConfig, 5);
Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+ Pipeline ecPipeline = pm.getPipeline(ecContainer.getPipelineID());
assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
- Set<ContainerReplica> replicas = getContainerReplicas(container);
+ assertEquals(Pipeline.PipelineState.OPEN, ecPipeline.getPipelineState());
+ // Find a DN to decommission that is in both the EC and Ratis pipeline.
+ // There are 7 DNs. The EC pipeline must have 5 and the Ratis must have 3
+ // so there must be an intersecting DN in both lists.
+ // Once we have a DN id, look it up in the NM, as the datanodeDetails
+ // instance in the pipeline may not be the same as the one stored in the
+ // NM.
+ final UUID dnID = pipeline.getNodes().stream()
+ .filter(node -> ecPipeline.getNodes().contains(node))
+ .findFirst().get().getUuid();
+ final DatanodeDetails dn = nm.getNodeByUuid(dnID.toString());
- final DatanodeDetails dn = getOneDNHostingReplica(replicas);
scmClient.startMaintenanceNodes(Arrays.asList(
getDNHostAndPort(dn)), 0);
@@ -309,7 +344,10 @@
// maintenance
Set<ContainerReplica> newReplicas =
cm.getContainerReplicas(container.containerID());
+ Set<ContainerReplica> ecReplicas =
+ cm.getContainerReplicas(ecContainer.containerID());
assertEquals(3, newReplicas.size());
+ assertEquals(5, ecReplicas.size());
// Stop the maintenance DN
cluster.shutdownHddsDatanode(dn);
@@ -318,7 +356,9 @@
// Now the maintenance node is dead, we should still have
// 3 replicas as we don't purge the replicas for a dead maintenance node
newReplicas = cm.getContainerReplicas(container.containerID());
+ ecReplicas = cm.getContainerReplicas(ecContainer.containerID());
assertEquals(3, newReplicas.size());
+ assertEquals(5, ecReplicas.size());
// Restart the DN and it should keep the IN_MAINTENANCE state
cluster.restartHddsDatanode(dn, true);
@@ -347,24 +387,25 @@
}
@Test
- // By default a node can enter maintenance if there are two replicas left
- // available when the maintenance nodes are stopped. Therefore putting all
+ // By default, a node can enter maintenance if there are two replicas left
+ // available when the maintenance nodes are stopped. Therefore, putting all
// nodes hosting a replica to maintenance should cause new replicas to get
// created before the nodes can enter maintenance. When the maintenance nodes
// return, the excess replicas should be removed.
public void testContainerIsReplicatedWhenAllNodesGotoMaintenance()
throws Exception {
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ generateData(20, "key", ratisRepConfig);
// Locate any container and find its open pipeline
- final ContainerInfo container = waitForAndReturnContainer();
+ final ContainerInfo container =
+ waitForAndReturnContainer(ratisRepConfig, 3);
Set<ContainerReplica> replicas = getContainerReplicas(container);
List<DatanodeDetails> forMaintenance = new ArrayList<>();
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
scmClient.startMaintenanceNodes(forMaintenance.stream()
- .map(d -> getDNHostAndPort(d))
+ .map(this::getDNHostAndPort)
.collect(Collectors.toList()), 0);
// Ensure all 3 DNs go to maintenance
@@ -385,8 +426,32 @@
for (DatanodeDetails dn : forMaintenance) {
waitForDnToReachOpState(dn, IN_SERVICE);
}
-
waitForContainerReplicas(container, 3);
+
+ // Now write some EC data and put two nodes into maintenance. This should
+ // result in at least 1 extra replica getting created.
+ generateData(20, "eckey", ecRepConfig);
+ final ContainerInfo ecContainer =
+ waitForAndReturnContainer(ecRepConfig, 5);
+ List<DatanodeDetails> ecMaintenance = replicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .limit(2)
+ .collect(Collectors.toList());
+ scmClient.startMaintenanceNodes(ecMaintenance.stream()
+ .map(this::getDNHostAndPort)
+ .collect(Collectors.toList()), 0);
+ for (DatanodeDetails dn : ecMaintenance) {
+ waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
+ }
+ assertTrue(cm.getContainerReplicas(ecContainer.containerID()).size() >= 6);
+ scmClient.recommissionNodes(forMaintenance.stream()
+ .map(this::getDNHostAndPort)
+ .collect(Collectors.toList()));
+ // Ensure the 2 DNs go to IN_SERVICE
+ for (DatanodeDetails dn : ecMaintenance) {
+ waitForDnToReachOpState(dn, IN_SERVICE);
+ }
+ waitForContainerReplicas(ecContainer, 5);
}
@Test
@@ -397,16 +462,17 @@
// Stop Replication Manager to sure no containers are replicated
stopReplicationManager();
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ generateData(20, "key", ratisRepConfig);
// Locate any container and find its open pipeline
- final ContainerInfo container = waitForAndReturnContainer();
+ final ContainerInfo container =
+ waitForAndReturnContainer(ratisRepConfig, 3);
Set<ContainerReplica> replicas = getContainerReplicas(container);
List<DatanodeDetails> forMaintenance = new ArrayList<>();
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
scmClient.startMaintenanceNodes(forMaintenance.stream()
- .map(d -> getDNHostAndPort(d))
+ .map(this::getDNHostAndPort)
.collect(Collectors.toList()), 0);
// Ensure all 3 DNs go to entering_maintenance
@@ -440,8 +506,8 @@
public void testMaintenanceEndsAutomaticallyAtTimeout()
throws Exception {
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
- ContainerInfo container = waitForAndReturnContainer();
+ generateData(20, "key", ratisRepConfig);
+ ContainerInfo container = waitForAndReturnContainer(ratisRepConfig, 3);
DatanodeDetails dn =
getOneDNHostingReplica(getContainerReplicas(container));
@@ -482,8 +548,8 @@
public void testSCMHandlesRestartForMaintenanceNode()
throws Exception {
// Generate some data on the empty cluster to create some containers
- generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
- ContainerInfo container = waitForAndReturnContainer();
+ generateData(20, "key", ratisRepConfig);
+ ContainerInfo container = waitForAndReturnContainer(ratisRepConfig, 3);
DatanodeDetails dn =
getOneDNHostingReplica(getContainerReplicas(container));
@@ -547,14 +613,13 @@
* Generates some data on the cluster so the cluster has some containers.
* @param keyCount The number of keys to create
* @param keyPrefix The prefix to use for the key name.
- * @param repFactor The replication Factor for the keys
- * @param repType The replication Type for the keys
+ * @param replicationConfig The replication config for the keys
* @throws IOException
*/
private void generateData(int keyCount, String keyPrefix,
- ReplicationFactor repFactor, ReplicationType repType) throws IOException {
+ ReplicationConfig replicationConfig) throws IOException {
for (int i = 0; i < keyCount; i++) {
- TestDataUtil.createKey(bucket, keyPrefix + i, repFactor, repType,
+ TestDataUtil.createKey(bucket, keyPrefix + i, replicationConfig,
"this is the content");
}
}
@@ -667,10 +732,18 @@
* @return A single container present on the cluster
* @throws Exception
*/
- private ContainerInfo waitForAndReturnContainer() throws Exception {
- final ContainerInfo container = cm.getContainers().get(0);
- // Ensure all 3 replicas of the container have been reported via ICR
- waitForContainerReplicas(container, 3);
+ private ContainerInfo waitForAndReturnContainer(ReplicationConfig repConfig,
+ int expectedReplicas) throws Exception {
+ List<ContainerInfo> containers = cm.getContainers();
+ ContainerInfo container = null;
+ for (ContainerInfo c : containers) {
+ if (c.getReplicationConfig().equals(repConfig)) {
+ container = c;
+ break;
+ }
+ }
+ // Ensure expected replicas of the container have been reported via ICR
+ waitForContainerReplicas(container, expectedReplicas);
return container;
}