HDDS-4911. List container by container state. (#2001)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 58aa0e6..abb0be2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -106,6 +106,19 @@
       int count) throws IOException;
 
   /**
+   * Lists a range of containers and get their info.
+   *
+   * @param startContainerID start containerID.
+   * @param count count must be {@literal >} 0.
+   * @param state Container of this state will be returned.
+   *
+   * @return a list of pipeline.
+   * @throws IOException
+   */
+  List<ContainerInfo> listContainer(long startContainerID,
+      int count, HddsProtos.LifeCycleState state) throws IOException;
+
+  /**
    * Read meta data from an existing container.
    * @param containerID - ID of the container.
    * @param pipeline - Pipeline where the container is located.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 20e5f6d..aa14a27 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -104,8 +104,27 @@
    * @return a list of container.
    * @throws IOException
    */
-  List<ContainerInfo> listContainer(long startContainerID, int count)
-      throws IOException;
+  List<ContainerInfo> listContainer(long startContainerID,
+      int count) throws IOException;
+
+  /**
+   * Ask SCM a list of containers with a range of container names
+   * and the limit of count.
+   * Search container names between start name(exclusive), and
+   * use prefix name to filter the result. the max size of the
+   * searching range cannot exceed the value of count.
+   *
+   * @param startContainerID start container ID.
+   * @param count count, if count {@literal <} 0, the max size is unlimited.(
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big)
+   * @param state Container with this state will be returned.
+   *
+   * @return a list of container.
+   * @throws IOException
+   */
+  List<ContainerInfo> listContainer(long startContainerID,
+      int count, HddsProtos.LifeCycleState state) throws IOException;
 
   /**
    * Deletes a container in SCM.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index dbc2a97..117e973 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -631,8 +631,8 @@
     try {
       if (kvContainer.getContainerData().getState() == State.UNHEALTHY) {
         throw new StorageContainerException(
-            "The container replica is unhealthy.",
-            CONTAINER_UNHEALTHY);
+            "The container(" + kvContainer.getContainerData().getContainerID() +
+            ") replica is unhealthy.", CONTAINER_UNHEALTHY);
       }
     } finally {
       kvContainer.readUnlock();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index f46e6e2..24fdf0d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -251,6 +251,12 @@
   @Override
   public List<ContainerInfo> listContainer(long startContainerID, int count)
       throws IOException {
+    return listContainer(startContainerID, count, null);
+  }
+
+  @Override
+  public List<ContainerInfo> listContainer(long startContainerID, int count,
+      HddsProtos.LifeCycleState state) throws IOException {
     Preconditions.checkState(startContainerID >= 0,
         "Container ID cannot be negative.");
     Preconditions.checkState(count > 0,
@@ -260,6 +266,10 @@
     builder.setStartContainerID(startContainerID);
     builder.setCount(count);
     builder.setTraceID(TracingUtil.exportCurrentSpan());
+    if (state != null) {
+      builder.setState(state);
+    }
+
     SCMListContainerRequestProto request = builder.build();
 
     SCMListContainerResponseProto response =
@@ -272,7 +282,6 @@
       containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
     }
     return containerList;
-
   }
 
   /**
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 3be47ea..7330f2e 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -212,6 +212,7 @@
   required uint32 count = 1;
   optional uint64 startContainerID = 2;
   optional string traceID = 3;
+  optional LifeCycleState state = 4;
 }
 
 message SCMListContainerResponseProto {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 0e1c98f..f13f894 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -95,6 +95,25 @@
   List<ContainerInfo> listContainer(ContainerID startContainerID, int count);
 
   /**
+   * Returns containers under certain conditions.
+   * Search container IDs from start ID(exclusive),
+   * The max size of the searching range cannot exceed the
+   * value of count.
+   *
+   * @param startContainerID start containerID, >=0,
+   * start searching at the head if 0.
+   * @param count count must be >= 0
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big.
+   * @param state Container of this state will be returned. Can be null.
+   *
+   * @return a list of container.
+   * @throws IOException
+   */
+  List<ContainerInfo> listContainer(ContainerID startContainerID, int count,
+      HddsProtos.LifeCycleState state);
+
+  /**
    * Allocates a new container for a given keyName and replication factor.
    *
    * @param replicationFactor - replication factor of the container.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 4d96465..849a891 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -230,14 +230,27 @@
   @Override
   public List<ContainerInfo> listContainer(ContainerID startContainerID,
       int count) {
+    return listContainer(startContainerID, count, null);
+  }
+
+  @Override
+  public List<ContainerInfo> listContainer(ContainerID startContainerID,
+      int count, HddsProtos.LifeCycleState state) {
     lock.lock();
     try {
       scmContainerManagerMetrics.incNumListContainersOps();
+      List<ContainerID> containersIds;
+      if (state == null) {
+        containersIds =
+            new ArrayList<>(containerStateManager.getAllContainerIDs());
+      } else {
+        containersIds = new ArrayList<>(
+            containerStateManager.getContainerIDsByState(state));
+      }
+      Collections.sort(containersIds);
+
       final long startId = startContainerID == null ?
           0 : startContainerID.getId();
-      final List<ContainerID> containersIds =
-          new ArrayList<>(containerStateManager.getAllContainerIDs());
-      Collections.sort(containersIds);
 
       return containersIds.stream()
           .filter(id -> id.getId() > startId)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index bd73d36..af8db21 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -376,8 +376,12 @@
       startContainerID = request.getStartContainerID();
     }
     count = request.getCount();
+    HddsProtos.LifeCycleState state = null;
+    if (request.hasState()) {
+      state = request.getState();
+    }
     List<ContainerInfo> containerList =
-        impl.listContainer(startContainerID, count);
+        impl.listContainer(startContainerID, count, state);
     SCMListContainerResponseProto.Builder builder =
         SCMListContainerResponseProto.newBuilder();
     for (ContainerInfo container : containerList) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e1334e7..b8f7fbc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -330,13 +330,41 @@
     }
   }
 
+  /**
+   * Lists a range of containers and get their info.
+   *
+   * @param startContainerID start containerID.
+   * @param count count must be {@literal >} 0.
+   *
+   * @return a list of pipeline.
+   * @throws IOException
+   */
   @Override
   public List<ContainerInfo> listContainer(long startContainerID,
       int count) throws IOException {
+    return listContainer(startContainerID, count, null);
+  }
+
+  /**
+   * Lists a range of containers and get their info.
+   *
+   * @param startContainerID start containerID.
+   * @param count count must be {@literal >} 0.
+   * @param state Container with this state will be returned.
+   *
+   * @return a list of pipeline.
+   * @throws IOException
+   */
+  @Override
+  public List<ContainerInfo> listContainer(long startContainerID,
+      int count, HddsProtos.LifeCycleState state) throws IOException {
     boolean auditSuccess = true;
     Map<String, String> auditMap = Maps.newHashMap();
     auditMap.put("startContainerID", String.valueOf(startContainerID));
     auditMap.put("count", String.valueOf(count));
+    if (state != null) {
+      auditMap.put("state", state.name());
+    }
     try {
       // To allow startcontainerId to take the value "0",
       // "null" is assigned, so that its handled in the
@@ -344,7 +372,7 @@
       final ContainerID containerId = startContainerID != 0 ? ContainerID
           .valueof(startContainerID) : null;
       return scm.getContainerManager().
-          listContainer(containerId, count);
+          listContainer(containerId, count, state);
     } catch (Exception ex) {
       auditSuccess = false;
       AUDIT.logReadFailure(
@@ -356,7 +384,6 @@
             buildAuditMessageForSuccess(SCMAction.LIST_CONTAINER, auditMap));
       }
     }
-
   }
 
   @Override
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/cli/OzoneAdmin.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/cli/OzoneAdmin.java
index aca8a4c..c3e0557 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/cli/OzoneAdmin.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/cli/OzoneAdmin.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.cli;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 import org.apache.log4j.ConsoleAppender;
@@ -27,6 +28,8 @@
 import org.apache.log4j.PatternLayout;
 import picocli.CommandLine;
 
+import java.util.function.Supplier;
+
 /**
  * Ozone Admin Command line tool.
  */
@@ -64,4 +67,14 @@
 
     new OzoneAdmin().run(argv);
   }
+
+  @Override
+  public void execute(String[] argv) {
+    TracingUtil.initTracing("shell", createOzoneConfiguration());
+    TracingUtil.executeInNewSpan("main",
+        (Supplier<Void>) () -> {
+          super.execute(argv);
+          return null;
+        });
+  }
 }
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index c96b9fe..06aba3b 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -394,6 +394,13 @@
         startContainerID, count);
   }
 
+  @Override
+  public List<ContainerInfo> listContainer(long startContainerID,
+      int count, HddsProtos.LifeCycleState state) throws IOException {
+    return storageContainerLocationClient.listContainer(
+        startContainerID, count, state);
+  }
+
   /**
    * Get meta data from an existing container.
    *
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ListSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ListSubcommand.java
index e9b0b7d..74e6184 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ListSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ListSubcommand.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -59,6 +60,11 @@
       defaultValue = "20", showDefaultValue = Visibility.ALWAYS)
   private int count;
 
+  @Option(names = {"--state"},
+      description = "Container state(OPEN, CLOSING, QUASI_CLOSED, CLOSED, " +
+          "DELETING, DELETED)")
+  private HddsProtos.LifeCycleState state;
+
   private static final ObjectWriter WRITER;
 
   static {
@@ -81,7 +87,7 @@
   @Override
   public void execute(ScmClient scmClient) throws IOException {
     List<ContainerInfo> containerList =
-        scmClient.listContainer(startId, count);
+        scmClient.listContainer(startId, count, state);
 
     // Output data list
     for (ContainerInfo container : containerList) {
diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
index 80edfd1..b3d1c33 100644
--- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
@@ -43,6 +43,10 @@
     ${output} =         Execute          ozone admin container list --scm scm
                         Should contain   ${output}   OPEN
 
+List containers with container state
+    ${output} =         Execute          ozone admin container list --state=CLOSED
+                        Should Not contain   ${output}   OPEN
+
 Container info
     ${output} =         Execute          ozone admin container info "${CONTAINER}"
                         Should contain   ${output}   Container id: ${CONTAINER}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
index a9b093c..4d86609 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
@@ -30,6 +30,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.OzoneAdmin;
 import org.apache.hadoop.ozone.OFSPath;
 import org.apache.hadoop.fs.ozone.OzoneFsShell;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -90,6 +92,7 @@
   private static OzoneConfiguration conf = null;
   private static MiniOzoneCluster cluster = null;
   private static OzoneShell ozoneShell = null;
+  private static OzoneAdmin ozoneAdminShell = null;
 
   private final ByteArrayOutputStream out = new ByteArrayOutputStream();
   private final ByteArrayOutputStream err = new ByteArrayOutputStream();
@@ -121,6 +124,7 @@
     testFile.createNewFile();
 
     ozoneShell = new OzoneShell();
+    ozoneAdminShell = new OzoneAdmin();
 
     // Init HA cluster
     omServiceId = "om-service-test1";
@@ -168,7 +172,7 @@
     System.setErr(OLD_ERR);
   }
 
-  private void execute(Shell shell, String[] args) {
+  private void execute(GenericCli shell, String[] args) {
     LOG.info("Executing OzoneShell command with args {}", Arrays.asList(args));
     CommandLine cmd = shell.getCmd();
 
@@ -455,6 +459,28 @@
   }
 
   /**
+   * Test ozone admin list command.
+   */
+  @Test
+  public void testOzoneAdminCmdList() throws UnsupportedEncodingException {
+    // Part of listing keys test.
+    generateKeys("/volume6", "/bucket");
+    // Test case 1: list OPEN container
+    String state = "--state=OPEN";
+    String[] args = new String[] {"container", "list", "--scm",
+        "localhost:" + cluster.getStorageContainerManager().getClientRpcPort(),
+        state};
+    execute(ozoneAdminShell, args);
+
+    // Test case 2: list CLOSED container
+    state = "--state=CLOSED";
+    args = new String[] {"container", "list", "--scm",
+        "localhost:" + cluster.getStorageContainerManager().getClientRpcPort(),
+        state};
+    execute(ozoneAdminShell, args);
+  }
+
+  /**
    * Helper function to retrieve Ozone client configuration for trash testing.
    * @param hostPrefix Scheme + Authority. e.g. ofs://om-service-test1
    * @param configuration Server config to generate client config from.