HDDS-6978. EC: Cleanup RECOVERING container on DN restarts (#3585)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 2a88a2f..573cee9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -34,6 +34,8 @@
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerDataProto.State.RECOVERING;
 
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.slf4j.Logger;
@@ -73,17 +75,18 @@
   private final ConfigurationSource config;
   private final File hddsVolumeDir;
   private final MutableVolumeSet volumeSet;
+  private final boolean shouldDeleteRecovering;
 
   public ContainerReader(
       MutableVolumeSet volSet, HddsVolume volume, ContainerSet cset,
-      ConfigurationSource conf
-  ) {
+      ConfigurationSource conf, boolean shouldDeleteRecovering) {
     Preconditions.checkNotNull(volume);
     this.hddsVolume = volume;
     this.hddsVolumeDir = hddsVolume.getHddsRootDir();
     this.containerSet = cset;
     this.config = conf;
     this.volumeSet = volSet;
+    this.shouldDeleteRecovering = shouldDeleteRecovering;
   }
 
   @Override
@@ -207,6 +210,14 @@
         KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
         KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
             config);
+        if (kvContainer.getContainerState() == RECOVERING) {
+          if (shouldDeleteRecovering) {
+            kvContainer.delete();
+            LOG.info("Delete recovering container {}.",
+                kvContainer.getContainerData().getContainerID());
+          }
+          return;
+        }
         containerSet.addContainer(kvContainer);
       } else {
         throw new StorageContainerException("Container File is corrupted. " +
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index df61cac..1db2961 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -245,7 +245,7 @@
     while (volumeSetIterator.hasNext()) {
       StorageVolume volume = volumeSetIterator.next();
       Thread thread = new Thread(new ContainerReader(volumeSet,
-          (HddsVolume) volume, containerSet, config));
+          (HddsVolume) volume, containerSet, config, true));
       thread.start();
       volumeThreads.add(thread);
     }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 2ab6f95..4f633b1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -22,8 +22,8 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -57,6 +57,7 @@
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
 import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -216,13 +217,26 @@
 
   @Test
   public void testContainerReader() throws Exception {
+    KeyValueContainerData recoveringContainerData = new KeyValueContainerData(
+        10, layout, (long) StorageUnit.GB.toBytes(5),
+        UUID.randomUUID().toString(), datanodeId.toString());
+    //create a container with recovering state
+    recoveringContainerData.setState(RECOVERING);
+
+    KeyValueContainer recoveringKeyValueContainer =
+        new KeyValueContainer(recoveringContainerData,
+            conf);
+    recoveringKeyValueContainer.create(
+        volumeSet, volumeChoosingPolicy, clusterId);
+
     ContainerReader containerReader = new ContainerReader(volumeSet,
-        hddsVolume, containerSet, conf);
+        hddsVolume, containerSet, conf, true);
 
     Thread thread = new Thread(containerReader);
     thread.start();
     thread.join();
 
+    //recovering container should be deleted, so the count should be 2
     Assert.assertEquals(2, containerSet.containerCount());
 
     for (int i = 0; i < 2; i++) {
@@ -284,7 +298,7 @@
     ContainerCache.getInstance(conf).shutdownCache();
 
     ContainerReader containerReader = new ContainerReader(volumeSet1,
-        hddsVolume1, containerSet1, conf);
+        hddsVolume1, containerSet1, conf, true);
     containerReader.readVolume(hddsVolume1.getHddsRootDir());
     Assert.assertEquals(containerCount - 1, containerSet1.containerCount());
   }
@@ -346,7 +360,7 @@
     Thread[] threads = new Thread[volumeNum];
     for (int i = 0; i < volumeNum; i++) {
       containerReaders[i] = new ContainerReader(volumeSets,
-          (HddsVolume) volumes.get(i), containerSet, conf);
+          (HddsVolume) volumes.get(i), containerSet, conf, true);
       threads[i] = new Thread(containerReaders[i]);
     }
     long startTime = System.currentTimeMillis();
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index c5b21d6..453370e 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -151,7 +151,7 @@
       HddsVolume volume = volumeSetIterator.next();
       LOG.info("Loading container metadata from volume " + volume.toString());
       final ContainerReader reader =
-          new ContainerReader(volumeSet, volume, containerSet, conf);
+          new ContainerReader(volumeSet, volume, containerSet, conf, false);
       reader.run();
     }