HDDS-7022. EC: Open EC container are not closed when SCM container was already closed. (#3668)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 79cf324..5b8d6da 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -42,7 +43,10 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -53,6 +57,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -64,6 +69,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
@@ -401,6 +407,16 @@
return new ContainerHealthResult.HealthyResult(containerInfo);
}
+ if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
+ List<ContainerReplica> unhealthyReplicas = replicas.stream()
+ .filter(r -> !compareState(containerInfo.getState(), r.getState()))
+ .collect(Collectors.toList());
+
+ if (unhealthyReplicas.size() > 0) {
+ handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
+ }
+ }
+
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
ContainerHealthResult health = ecContainerHealthCheck
@@ -433,6 +449,67 @@
}
/**
+ * Handles unhealthy container.
+ * A container is inconsistent if any of the replica state doesn't
+ * match the container state. We have to take appropriate action
+ * based on state of the replica.
+ *
+ * @param container ContainerInfo
+ * @param unhealthyReplicas List of ContainerReplica
+ */
+ private void handleUnhealthyReplicas(final ContainerInfo container,
+ List<ContainerReplica> unhealthyReplicas) {
+ Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+ while (iterator.hasNext()) {
+ final ContainerReplica replica = iterator.next();
+ final ContainerReplicaProto.State state = replica.getState();
+ if (state == State.OPEN || state == State.CLOSING) {
+ sendCloseCommand(container, replica.getDatanodeDetails(), true);
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Sends close container command for the given container to the given
+ * datanode.
+ *
+ * @param container Container to be closed
+ * @param datanode The datanode on which the container
+ * has to be closed
+ * @param force Should be set to true if we want to force close.
+ */
+ private void sendCloseCommand(final ContainerInfo container,
+ final DatanodeDetails datanode, final boolean force) {
+
+ ContainerID containerID = container.containerID();
+ LOG.info("Sending close container command for container {}" +
+ " to datanode {}.", containerID, datanode);
+ CloseContainerCommand closeContainerCommand =
+ new CloseContainerCommand(container.getContainerID(),
+ container.getPipelineID(), force);
+ try {
+ closeContainerCommand.setTerm(scmContext.getTermOfLeader());
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending close container command,"
+ + " since current SCM is not leader.", nle);
+ return;
+ }
+ closeContainerCommand.setEncodedToken(getContainerToken(containerID));
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
+ }
+
+ private String getContainerToken(ContainerID containerID) {
+ if (scmContext.getScm() instanceof StorageContainerManager) {
+ StorageContainerManager scm =
+ (StorageContainerManager) scmContext.getScm();
+ return scm.getContainerTokenGenerator().generateEncodedToken(containerID);
+ }
+ return ""; // unit test
+ }
+
+ /**
* Creates a priority queue of UnderReplicatedHealthResult, where the elements
* are ordered by the weighted redundancy of the container. This means that
* containers with the least remaining redundancy are at the front of the
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index da49bef..38ec64c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -23,15 +23,17 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -43,9 +45,9 @@
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.ozone.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -54,6 +56,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
@@ -78,7 +81,7 @@
/**
* Create a MiniDFSCluster for testing.
*/
- @BeforeClass
+ @BeforeAll
public static void init() throws Exception {
chunkSize = 1024;
flushSize = 2 * chunkSize;
@@ -125,7 +128,7 @@
/**
* Shutdown MiniDFSCluster.
*/
- @AfterClass
+ @AfterAll
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
@@ -180,30 +183,30 @@
List<ContainerInfo> containers =
cluster.getStorageContainerManager().getContainerManager()
.getContainers();
- long containerID = 0;
+ ContainerInfo container = null;
for (ContainerInfo info : containers) {
if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
- containerID = info.containerID().getProtobuf().getId();
+ container = info;
}
}
StorageContainerManager scm = cluster.getStorageContainerManager();
- StorageContainerLocationProtocol scmContainerClient =
- HAUtils.getScmContainerClient(cluster.getConf());
- scmContainerClient.closeContainer(containerID);
- // Make sure replica closed.
- waitForDNContainerState(ContainerProtos.ContainerDataProto.State.CLOSED,
- pipeline, containerID);
+
+ // Shutting sown DN triggers close pipeline and close container.
+ cluster.shutdownHddsDatanode(pipeline.getFirstNode());
+
+ // Make sure container closed.
+ waitForSCMContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED, container.containerID());
//Temporarily stop the RM process.
scm.getReplicationManager().stop();
- // Stop the DN and wait for the lower replication.
- cluster.shutdownHddsDatanode(pipeline.getFirstNode());
- waitForContainerCount(4, containerID, scm);
+ // Wait for the lower replication.
+ waitForContainerCount(4, container.containerID(), scm);
// Start the RM to resume the replication process and wait for the
// reconstruction.
scm.getReplicationManager().start();
- waitForContainerCount(5, containerID, scm);
+ waitForContainerCount(5, container.containerID(), scm);
// Let's verify for Over replications now.
//Temporarily stop the RM process.
@@ -213,35 +216,56 @@
// increased.
cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
// Check container is over replicated.
- waitForContainerCount(6, containerID, scm);
+ waitForContainerCount(6, container.containerID(), scm);
+ // Wait for all the replicas to be closed.
+ container = scm.getContainerInfo(container.getContainerID());
+ waitForDNContainerState(container, scm);
// Resume RM and wait the over replicated replica deleted.
scm.getReplicationManager().start();
- waitForContainerCount(5, containerID, scm);
+ waitForContainerCount(5, container.containerID(), scm);
}
- private void waitForDNContainerState(
- ContainerProtos.ContainerDataProto.State state, Pipeline pipeline,
- long containerID) throws TimeoutException, InterruptedException {
- //Wait until container closed at DN
+ private void waitForDNContainerState(ContainerInfo container,
+ StorageContainerManager scm) throws InterruptedException,
+ TimeoutException {
GenericTestUtils.waitFor(() -> {
try {
- return cluster.getHddsDatanode(pipeline.getFirstNode())
- .getDatanodeStateMachine().getContainer().getContainerSet()
- .getContainer(containerID).getContainerState() == state;
+ List<ContainerReplica> unhealthyReplicas = scm.getContainerManager()
+ .getContainerReplicas(container.containerID()).stream()
+ .filter(r -> !ReplicationManager
+ .compareState(container.getState(), r.getState()))
+ .collect(Collectors.toList());
+ return unhealthyReplicas.size() == 0;
+ } catch (ContainerNotFoundException e) {
+ return false;
+ }
+ }, 100, 100000);
+ }
+
+ private void waitForSCMContainerState(
+ StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State state,
+ ContainerID containerID) throws TimeoutException, InterruptedException {
+ //Wait until container closed at SCM
+ GenericTestUtils.waitFor(() -> {
+ try {
+ HddsProtos.LifeCycleState containerState = cluster
+ .getStorageContainerManager().getContainerManager()
+ .getContainer(containerID).getState();
+ return ReplicationManager.compareState(containerState, state);
} catch (IOException e) {
return false;
}
}, 100, 100000);
}
- private void waitForContainerCount(int count, long containerID,
+ private void waitForContainerCount(int count, ContainerID containerID,
StorageContainerManager scm)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
return scm.getContainerManager()
- .getContainerReplicas(ContainerID.valueOf(containerID))
+ .getContainerReplicas(containerID)
.size() == count;
} catch (ContainerNotFoundException e) {
return false;