[IGNITE-22091] Implement CLI for disaster recovery: partition states (#3668)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 1254482..217eac8 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -612,8 +612,8 @@
         /** Disaster recovery group. */
         public static final ErrorGroup RECOVERY_ERR_GROUP = registerGroup("RECOVERY", (short) 20);
 
-        /** Partitions were not found. */
-        public static final int PARTITIONS_NOT_FOUND_ERR = RECOVERY_ERR_GROUP.registerErrorCode((short) 1);
+        /** Partition ID is not in valid range. */
+        public static final int ILLEGAL_PARTITION_ID_ERR = RECOVERY_ERR_GROUP.registerErrorCode((short) 1);
 
         /** Nodes were not found. */
         public static final int NODES_NOT_FOUND_ERR = RECOVERY_ERR_GROUP.registerErrorCode((short) 2);
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index eb13a09..62a9ce7 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -27,6 +27,7 @@
 import java.io.StringWriter;
 import java.io.Writer;
 import java.util.List;
+import java.util.Set;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.cli.call.connect.ConnectCall;
@@ -145,18 +146,65 @@
                 .isEqualTo(expectedOutput);
     }
 
+    protected void assertOutputStartsWith(String expectedOutput) {
+        assertThat(sout.toString())
+                .as("Expected command output to start with: " + expectedOutput + " but was " + sout.toString())
+                .startsWith(expectedOutput);
+    }
+
     protected void assertOutputContains(String expectedOutput) {
         assertThat(sout.toString())
                 .as("Expected command output to contain: " + expectedOutput + " but was " + sout.toString())
                 .contains(expectedOutput);
     }
 
+    protected void assertOutputContainsAnyIgnoringCase(Set<String> expectedOutput) {
+        CharSequence[] expectedUpperCase = expectedOutput.stream().map(String::toUpperCase).toArray(CharSequence[]::new);
+
+        assertThat(sout.toString().toUpperCase())
+                .as("Expected command output to contain any of, ignoring case: " + expectedOutput + " but was " + sout.toString())
+                .containsAnyOf(expectedUpperCase);
+    }
+
+    protected void assertOutputContainsAny(Set<String> expectedOutput) {
+
+        assertThat(sout.toString())
+                .as("Expected command output to contain any of: " + expectedOutput + " but was " + sout.toString())
+                .containsAnyOf(expectedOutput.toArray(CharSequence[]::new));
+    }
+
+    protected void assertOutputContainsAllIgnoringCase(Set<String> expectedOutput) {
+        CharSequence[] expectedUpperCase = expectedOutput.stream().map(String::toUpperCase).toArray(CharSequence[]::new);
+
+        assertThat(sout.toString().toUpperCase())
+                .as("Expected command output to contain all of, ignoring case: " + expectedOutput + " but was " + sout.toString())
+                .contains(expectedUpperCase);
+    }
+
+    protected void assertOutputContainsAll(Set<String> expectedOutput) {
+        assertThat(sout.toString())
+                .as("Expected command output to contain all of: " + expectedOutput + " but was " + sout.toString())
+                .contains(expectedOutput.toArray(CharSequence[]::new));
+    }
+
     protected void assertOutputDoesNotContain(String expectedOutput) {
         assertThat(sout.toString())
                 .as("Expected command output to not contain: " + expectedOutput + " but was " + sout.toString())
                 .doesNotContain(expectedOutput);
     }
 
+    protected void assertOutputDoesNotContain(Set<String> expectedOutput) {
+        assertThat(sout.toString())
+                .as("Expected command output to not contain: " + expectedOutput + " but was " + sout.toString())
+                .doesNotContain(expectedOutput.toArray(CharSequence[]::new));
+    }
+
+    protected void assertOutputDoesNotContainIgnoreCase(Set<String> expectedOutput) {
+        assertThat(sout.toString())
+                .as("Expected command output to not contain: " + expectedOutput + " but was " + sout.toString())
+                .doesNotContainIgnoringCase(expectedOutput.toArray(CharSequence[]::new));
+    }
+
     protected void assertOutputMatches(String regex) {
         assertThat(sout.toString())
                 .as("Expected command output to match regex: " + regex + " but it is not: " + sout.toString())
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesCommandTest.java
new file mode 100644
index 0000000..1b62ee9
--- /dev/null
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesCommandTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery;
+
+import java.util.List;
+import org.apache.ignite.internal.cli.commands.recovery.partitions.PartitionStatesCommand;
+import org.apache.ignite.internal.util.CollectionUtils;
+
+/** Test class for {@link PartitionStatesCommand}. */
+public class ItPartitionStatesCommandTest extends ItPartitionStatesTest {
+
+    @Override
+    protected void execute(String... args) {
+        String[] fullArgs = CollectionUtils.concat(List.of("recovery", "partition-states"), List.of(args)).toArray(String[]::new);
+
+        super.execute(fullArgs);
+    }
+}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesReplCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesReplCommandTest.java
new file mode 100644
index 0000000..11536dd
--- /dev/null
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesReplCommandTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery;
+
+import org.apache.ignite.internal.cli.commands.recovery.partitions.PartitionStatesReplCommand;
+
+/** Test class for {@link PartitionStatesReplCommand}. */
+public class ItPartitionStatesReplCommandTest extends ItPartitionStatesTest {
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return PartitionStatesReplCommand.class;
+    }
+}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesTest.java
new file mode 100644
index 0000000..3082f6b
--- /dev/null
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/ItPartitionStatesTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.CLUSTER_URL_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.PLAIN_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_NODE_NAMES_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_GLOBAL_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_IDS_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_LOCAL_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_ZONE_NAMES_OPTION;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cli.CliIntegrationTest;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Base test class for Recovery partition states commands. */
+public abstract class ItPartitionStatesTest extends CliIntegrationTest {
+    private static final int DEFAULT_PARTITION_COUNT = 25;
+
+    private static final Set<String> ZONES = Set.of("first_ZONE", "second_ZONE", "third_ZONE");
+
+    private static final Set<String> MIXED_CASE_ZONES = Set.of("mixed_first_zone", "MIXED_FIRST_ZONE", "mixed_second_zone",
+            "MIXED_SECOND_ZONE");
+
+    private static final Set<String> ZONES_CONTAINING_TABLES = new HashSet<>(CollectionUtils.concat(ZONES, MIXED_CASE_ZONES));
+
+    private static final String EMPTY_ZONE = "empty_ZONE";
+
+    private static final Set<String> STATES = Set.of("HEALTHY", "AVAILABLE");
+
+    private static final int DONT_CHECK_PARTITIONS = -1;
+
+    private static Set<String> nodeNames;
+
+    @BeforeAll
+    public static void createTables() {
+        ZONES_CONTAINING_TABLES.forEach(name -> {
+            sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", name, DEFAULT_AIPERSIST_PROFILE_NAME));
+            sql(String.format("CREATE TABLE \"%s_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%1$s'", name));
+        });
+
+        sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", EMPTY_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+
+        nodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).collect(toSet());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStates(boolean global) {
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION);
+
+        checkOutput(global, ZONES_CONTAINING_TABLES, nodeNames, DEFAULT_PARTITION_COUNT);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStatesByZones(boolean global) {
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_ZONE_NAMES_OPTION, String.join(",", ZONES),
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        checkOutput(global, ZONES, nodeNames, DEFAULT_PARTITION_COUNT);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStatesByPartitions(boolean global) {
+        String partitions = "0,1";
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_PARTITION_IDS_OPTION, partitions,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        checkOutput(global, ZONES, nodeNames, 2);
+    }
+
+    @Test
+    void testLocalPartitionStatesByNodes() {
+        Set<String> nodes = nodeNames.stream().limit(nodeNames.size() - 1).collect(toSet());
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_NODE_NAMES_OPTION, String.join(",", nodes),
+                RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        checkOutput(false, ZONES, nodes, DONT_CHECK_PARTITIONS);
+    }
+
+    @Test
+    void testLocalPartitionStatesByNodesIsCaseSensitive() {
+        Set<String> nodeNames = Set.of(CLUSTER.node(0).node().name(), CLUSTER.node(1).node().name());
+
+        String url = "state/local?nodeNames=" + String.join(",", nodeNames).toUpperCase();
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_NODE_NAMES_OPTION, String.join(",", nodeNames).toUpperCase(),
+                RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        assertErrOutputContains("Some nodes are missing: ");
+
+        nodeNames.forEach(name -> assertErrOutputContains(name.toUpperCase()));
+
+        assertOutputIsEmpty();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStatesZonesMixedCase(boolean global) {
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_ZONE_NAMES_OPTION, String.join(",", MIXED_CASE_ZONES),
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        checkOutput(global, MIXED_CASE_ZONES, nodeNames, DEFAULT_PARTITION_COUNT);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStatesMissingZone(boolean global) {
+        String unknownZone = "UNKNOWN_ZONE";
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_ZONE_NAMES_OPTION, unknownZone,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        assertErrOutputContains("Some distribution zones are missing: [UNKNOWN_ZONE]");
+
+        assertOutputIsEmpty();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testLocalPartitionStatesNegativePartition(boolean global) {
+        String partitions = "1,-100,0";
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_PARTITION_IDS_OPTION, partitions,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        assertErrOutputContains("Partition ID can't be negative, found: -100");
+
+        assertOutputIsEmpty();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testLocalPartitionStatesPartitionOutOfRange(boolean global) {
+        String partitions = "0,1," + DEFAULT_PARTITION_COUNT;
+        String zoneName = ZONES_CONTAINING_TABLES.stream().findAny().get();
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_PARTITION_IDS_OPTION, partitions,
+                RECOVERY_ZONE_NAMES_OPTION, zoneName,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        assertErrOutputContains(String.format(
+                "Partition IDs should be in range [0, %d] for zone %s, found: %d",
+                DEFAULT_PARTITION_COUNT - 1,
+                zoneName,
+                DEFAULT_PARTITION_COUNT
+        ));
+
+        assertOutputIsEmpty();
+    }
+
+    @Test
+    void testPartitionStatesMissingNode() {
+        String unknownNode = "unknown_node";
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_NODE_NAMES_OPTION, unknownNode,
+                RECOVERY_PARTITION_LOCAL_OPTION, PLAIN_OPTION
+        );
+
+        assertErrOutputContains("Some nodes are missing: [unknown_node]");
+
+        assertOutputIsEmpty();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testPartitionStatesEmptyResult(boolean global) {
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_ZONE_NAMES_OPTION, EMPTY_ZONE,
+                global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        checkOutput(global, Set.of(), Set.of(), 0);
+    }
+
+    @Test
+    void testOutputFormatGlobal() {
+        String zoneName = ZONES.stream().findAny().get();
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_PARTITION_GLOBAL_OPTION,
+                RECOVERY_ZONE_NAMES_OPTION, zoneName,
+                RECOVERY_PARTITION_IDS_OPTION, "1",
+                PLAIN_OPTION);
+
+        assertErrOutputIsEmpty();
+        assertOutputMatches(String.format(
+                "Zone name\tTable name\tPartition ID\tState\\r?\\n%1$s\t%1$s_table\t1\t(HEALTHY|AVAILABLE)\\r?\\n",
+                zoneName));
+    }
+
+    @Test
+    void testOutputFormatLocal() {
+        String zoneName = ZONES.stream().findAny().get();
+
+        String possibleNodeNames = String.join("|", nodeNames);
+
+        execute(CLUSTER_URL_OPTION, NODE_URL,
+                RECOVERY_ZONE_NAMES_OPTION, zoneName,
+                RECOVERY_PARTITION_IDS_OPTION, "1",
+                RECOVERY_PARTITION_LOCAL_OPTION,
+                PLAIN_OPTION
+        );
+
+        assertErrOutputIsEmpty();
+
+        assertOutputMatches(String.format(
+                "Node name\tZone name\tTable name\tPartition ID\tState\\r?\\n(%1$s)\t%2$s\t%2$s_table\t1\t(HEALTHY|AVAILABLE)\\r?\\n",
+                possibleNodeNames,
+                zoneName)
+        );
+    }
+
+    private void checkOutput(boolean global, Set<String> zoneNames, Set<String> nodes, int partitions) {
+        assertErrOutputIsEmpty();
+        assertOutputStartsWith((global ? "" : "Node name\t") + "Zone name\tTable name\tPartition ID\tState");
+
+        if (!global) {
+            if (!nodes.isEmpty()) {
+                assertOutputContainsAny(nodes);
+            }
+
+            Set<String> anotherNodes = CollectionUtils.difference(nodeNames, nodes);
+
+            if (!anotherNodes.isEmpty()) {
+                assertOutputDoesNotContainIgnoreCase(anotherNodes);
+            }
+        }
+
+        if (!zoneNames.isEmpty()) {
+            assertOutputContainsAll(zoneNames);
+
+            Set<String> tableNames = zoneNames.stream().map(it -> it + "_table").collect(toSet());
+
+            assertOutputContainsAllIgnoringCase(tableNames);
+        }
+
+        Set<String> anotherZones = CollectionUtils.difference(ZONES, zoneNames);
+
+        if (!anotherZones.isEmpty()) {
+            assertOutputDoesNotContain(anotherZones);
+        }
+
+        if (!zoneNames.isEmpty() && nodeNames.isEmpty()) {
+            assertOutputContainsAny(STATES);
+        }
+
+        if (partitions != DONT_CHECK_PARTITIONS) {
+            for (int i = 0; i < partitions; i++) {
+                assertOutputContains("\t" + i + "\t");
+            }
+
+            for (int i = partitions; i < DEFAULT_PARTITION_COUNT; i++) {
+                assertOutputDoesNotContain("\t" + i + "\t");
+            }
+        }
+    }
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCall.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCall.java
new file mode 100644
index 0000000..f547e29
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCall.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.call.recovery;
+
+import static java.util.stream.Collectors.toList;
+
+import jakarta.inject.Singleton;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cli.core.call.Call;
+import org.apache.ignite.internal.cli.core.call.DefaultCallOutput;
+import org.apache.ignite.internal.cli.core.exception.IgniteCliApiException;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.sql.table.Table;
+import org.apache.ignite.rest.client.api.RecoveryApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+import org.apache.ignite.rest.client.model.GlobalPartitionStatesResponse;
+import org.apache.ignite.rest.client.model.LocalPartitionStatesResponse;
+
+/** Call to get partition states. */
+@Singleton
+public class PartitionStatesCall implements Call<PartitionStatesCallInput, Table> {
+    private final ApiClientFactory clientFactory;
+
+    private static final List<String> GLOBAL_HEADERS = List.of("Zone name", "Table name", "Partition ID", "State");
+
+    private static final List<String> LOCAL_HEADERS = Stream
+            .concat(Stream.of("Node name"), GLOBAL_HEADERS.stream())
+            .collect(toList());
+
+    public PartitionStatesCall(ApiClientFactory clientFactory) {
+        this.clientFactory = clientFactory;
+    }
+
+    @Override
+    public DefaultCallOutput<Table> execute(PartitionStatesCallInput input) {
+        RecoveryApi client = new RecoveryApi(clientFactory.getClient(input.clusterUrl()));
+
+        List<String> trimmedZoneNames = trim(input.zoneNames());
+
+        try {
+            if (input.local()) {
+                return getLocalPartitionStatesOutput(client, trimmedZoneNames, input);
+            } else {
+                return getGlobalPartitionStatesOutput(input, client, trimmedZoneNames);
+            }
+        } catch (ApiException e) {
+            return DefaultCallOutput.failure(new IgniteCliApiException(e, input.clusterUrl()));
+        }
+    }
+
+    private static DefaultCallOutput<Table> getGlobalPartitionStatesOutput(
+            PartitionStatesCallInput input,
+            RecoveryApi client,
+            List<String> trimmedZoneNames
+    ) throws ApiException {
+        GlobalPartitionStatesResponse globalStates = client.getGlobalPartitionStates(
+                trimmedZoneNames,
+                input.partitionIds()
+        );
+
+        List<String> content = globalStates.getStates().stream()
+                .flatMap(state -> Stream.of(
+                                state.getZoneName(),
+                                state.getTableName(),
+                                String.valueOf(state.getPartitionId()),
+                                state.getState()
+                        )
+                )
+                .collect(toList());
+
+        return DefaultCallOutput.success(new Table(GLOBAL_HEADERS, content));
+    }
+
+    private static DefaultCallOutput<Table> getLocalPartitionStatesOutput(
+            RecoveryApi client,
+            List<String> trimmedZoneNames,
+            PartitionStatesCallInput input)
+            throws ApiException {
+        List<String> trimmedNodeNames = trim(input.nodeNames());
+
+        LocalPartitionStatesResponse localStates = client.getLocalPartitionStates(
+                trimmedZoneNames,
+                trimmedNodeNames,
+                input.partitionIds()
+        );
+
+        List<String> content;
+        content = localStates.getStates().stream()
+                .flatMap(state -> Stream.of(
+                                state.getNodeName(),
+                                state.getZoneName(),
+                                state.getTableName(),
+                                String.valueOf(state.getPartitionId()),
+                                state.getState()
+                        )
+                )
+                .collect(toList());
+
+        return DefaultCallOutput.success(new Table(LOCAL_HEADERS, content));
+    }
+
+    private static List<String> trim(List<String> names) {
+        return names == null
+                ? List.of()
+                : names.stream()
+                        .map(String::trim)
+                        .collect(toList());
+    }
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCallInput.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCallInput.java
new file mode 100644
index 0000000..4da4275
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/PartitionStatesCallInput.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.call.recovery;
+
+import java.util.List;
+import org.apache.ignite.internal.cli.commands.recovery.partitions.PartitionStatesMixin;
+import org.apache.ignite.internal.cli.core.call.CallInput;
+
+/** Input for the {@link PartitionStatesCall} call. */
+public class PartitionStatesCallInput implements CallInput {
+    private final String clusterUrl;
+
+    private final boolean local;
+
+    private final List<String> nodeNames;
+
+    private final List<String> zoneNames;
+
+    private final List<Integer> partitionIds;
+
+    /** Cluster url. */
+    public String clusterUrl() {
+        return clusterUrl;
+    }
+
+    /** If local partition states should be returned. */
+    public boolean local() {
+        return local;
+    }
+
+    /** Returns node names to get local partition states from. */
+    public List<String> nodeNames() {
+        return nodeNames;
+    }
+
+    /** Names of zones to get partition states of. */
+    public List<String> zoneNames() {
+        return zoneNames;
+    }
+
+    /** IDs of partitions to get states of. */
+    public List<Integer> partitionIds() {
+        return partitionIds;
+    }
+
+    private PartitionStatesCallInput(
+            String clusterUrl,
+            boolean local,
+            List<String> nodeNames,
+            List<String> zoneNames,
+            List<Integer> partitionIds
+    ) {
+        this.clusterUrl = clusterUrl;
+        this.local = local;
+        this.nodeNames = nodeNames == null ? List.of() : List.copyOf(nodeNames);
+        this.zoneNames = zoneNames == null ? List.of() : List.copyOf(zoneNames);
+        this.partitionIds = partitionIds == null ? List.of() : List.copyOf(partitionIds);
+    }
+
+    public static PartitionStatesCallInput of(PartitionStatesMixin statesArgs) {
+        return of(statesArgs, statesArgs.clusterUrl());
+    }
+
+    /** Returns {@link PartitionStatesCallInput} with specified arguments. */
+    public static PartitionStatesCallInput of(PartitionStatesMixin statesArgs, String clusterUrl) {
+        return builder()
+                .local(statesArgs.local())
+                .nodeNames(statesArgs.nodeNames())
+                .zoneNames(statesArgs.zoneNames())
+                .partitionIds(statesArgs.partitionIds())
+                .clusterUrl(clusterUrl)
+                .build();
+    }
+
+    /**
+     * Builder method provider.
+     *
+     * @return new instance of {@link PartitionStatesCallInputBuilder}.
+     */
+    private static PartitionStatesCallInputBuilder builder() {
+        return new PartitionStatesCallInputBuilder();
+    }
+
+    /** Builder for {@link PartitionStatesCallInput}. */
+    private static class PartitionStatesCallInputBuilder {
+        private String clusterUrl;
+
+        private boolean local;
+
+        private List<String> nodeNames;
+
+        private List<String> zoneNames;
+
+        private List<Integer> partitionIds;
+
+        /** Set cluster URL. */
+        PartitionStatesCallInputBuilder clusterUrl(String clusterUrl) {
+            this.clusterUrl = clusterUrl;
+            return this;
+        }
+
+        /** Set flag to get local partition states. */
+        PartitionStatesCallInputBuilder local(boolean local) {
+            this.local = local;
+            return this;
+        }
+
+        /** Set names of zones to get partition states of. */
+        PartitionStatesCallInputBuilder nodeNames(List<String> nodeNames) {
+            this.nodeNames = nodeNames;
+            return this;
+        }
+
+        /** Set names of zones to get partition states of. */
+        PartitionStatesCallInputBuilder zoneNames(List<String> zoneNames) {
+            this.zoneNames = zoneNames;
+            return this;
+        }
+
+        /** Names of zones to get partition states of. */
+        PartitionStatesCallInputBuilder partitionIds(List<Integer> partitionIds) {
+            this.partitionIds = partitionIds;
+            return this;
+        }
+
+        /** Set IDs of partitions to get states of. */
+        PartitionStatesCallInput build() {
+            return new PartitionStatesCallInput(clusterUrl, local, nodeNames, zoneNames, partitionIds);
+        }
+    }
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
index 57a6ff0..1d3ad48 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
@@ -283,6 +283,26 @@
 
         public static final String PASSWORD_KEY = CliConfigKeys.Constants.BASIC_AUTHENTICATION_PASSWORD;
 
+        public static final String RECOVERY_PARTITION_GLOBAL_OPTION = "--global";
 
+        public static final String RECOVERY_PARTITION_GLOBAL_OPTION_DESC = "Get global partitions states";
+
+        public static final String RECOVERY_PARTITION_LOCAL_OPTION = "--local";
+
+        public static final String RECOVERY_PARTITION_LOCAL_OPTION_DESC = "Get local partition states";
+
+        public static final String RECOVERY_PARTITION_IDS_OPTION = "--partitions";
+
+        public static final String RECOVERY_PARTITION_IDS_OPTION_DESC = "IDs of partitions to get states. All partitions if not set";
+
+        public static final String RECOVERY_ZONE_NAMES_OPTION = "--zones";
+
+        public static final String RECOVERY_ZONE_NAMES_OPTION_DESC = "Names specifying zones to get partition states from. "
+                + "Case-sensitive, all zones if not set";
+
+        public static final String RECOVERY_NODE_NAMES_OPTION = "--nodes";
+
+        public static final String RECOVERY_NODE_NAMES_OPTION_DESC = "Names specifying nodes to get partition states from. "
+                + "Case-sensitive, all nodes if not set";
     }
 }
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliCommand.java
index ee13d88..610e13a 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliCommand.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliCommand.java
@@ -25,6 +25,7 @@
 import org.apache.ignite.internal.cli.commands.cluster.ClusterCommand;
 import org.apache.ignite.internal.cli.commands.connect.ConnectCommand;
 import org.apache.ignite.internal.cli.commands.node.NodeCommand;
+import org.apache.ignite.internal.cli.commands.recovery.RecoveryCommand;
 import org.apache.ignite.internal.cli.commands.sql.SqlCommand;
 import picocli.CommandLine;
 import picocli.CommandLine.Command;
@@ -45,7 +46,8 @@
                 CliCommand.class,
                 ConnectCommand.class,
                 NodeCommand.class,
-                ClusterCommand.class
+                ClusterCommand.class,
+                RecoveryCommand.class
         })
 public class TopLevelCliCommand extends BaseCommand {
     @SuppressWarnings("PMD.UnusedPrivateField")
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliReplCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliReplCommand.java
index 53c99a3..df93c01 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliReplCommand.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/TopLevelCliReplCommand.java
@@ -22,6 +22,7 @@
 import org.apache.ignite.internal.cli.commands.connect.ConnectReplCommand;
 import org.apache.ignite.internal.cli.commands.connect.DisconnectCommand;
 import org.apache.ignite.internal.cli.commands.node.NodeReplCommand;
+import org.apache.ignite.internal.cli.commands.recovery.RecoveryReplCommand;
 import org.apache.ignite.internal.cli.commands.sql.SqlReplCommand;
 import org.apache.ignite.internal.cli.commands.version.VersionCommand;
 import picocli.CommandLine;
@@ -42,7 +43,8 @@
                 ConnectReplCommand.class,
                 DisconnectCommand.class,
                 NodeReplCommand.class,
-                ClusterReplCommand.class
+                ClusterReplCommand.class,
+                RecoveryReplCommand.class
         })
 public class TopLevelCliReplCommand {
 }
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryCommand.java
new file mode 100644
index 0000000..bba19e6
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery;
+
+import org.apache.ignite.internal.cli.commands.BaseCommand;
+import org.apache.ignite.internal.cli.commands.recovery.partitions.PartitionStatesCommand;
+import picocli.CommandLine.Command;
+
+/** Disaster recovery command. */
+@Command(name = "recovery",
+        subcommands = {
+                PartitionStatesCommand.class
+        },
+        description = "Managers disaster recovery of Ignite cluster")
+public class RecoveryCommand extends BaseCommand {
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryReplCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryReplCommand.java
new file mode 100644
index 0000000..dfc26a2
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/RecoveryReplCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery;
+
+import org.apache.ignite.internal.cli.commands.BaseCommand;
+import org.apache.ignite.internal.cli.commands.recovery.partitions.PartitionStatesReplCommand;
+import picocli.CommandLine.Command;
+
+/** Disaster recovery command. */
+@Command(name = "recovery",
+        subcommands = {
+                PartitionStatesReplCommand.class
+        },
+        description = "Managers disaster recovery of Ignite cluster")
+public class RecoveryReplCommand extends BaseCommand {
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesCommand.java
new file mode 100644
index 0000000..1829a28
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery.partitions;
+
+import jakarta.inject.Inject;
+import java.util.concurrent.Callable;
+import org.apache.ignite.internal.cli.call.recovery.PartitionStatesCall;
+import org.apache.ignite.internal.cli.call.recovery.PartitionStatesCallInput;
+import org.apache.ignite.internal.cli.commands.BaseCommand;
+import org.apache.ignite.internal.cli.core.call.CallExecutionPipeline;
+import org.apache.ignite.internal.cli.decorators.TableDecorator;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Mixin;
+
+/** Command to get partition states. */
+@Command(name = "partition-states", description = "Returns partition states.")
+public class PartitionStatesCommand extends BaseCommand implements Callable<Integer> {
+    @Mixin
+    private PartitionStatesMixin options;
+
+    @Inject
+    private PartitionStatesCall call;
+
+    @Override
+    public Integer call() {
+        return CallExecutionPipeline.builder(call)
+                .inputProvider(() -> PartitionStatesCallInput.of(options))
+                .output(spec.commandLine().getOut())
+                .errOutput(spec.commandLine().getErr())
+                .decorator(new TableDecorator(options.plain()))
+                .verbose(verbose)
+                .build()
+                .runPipeline();
+    }
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesMixin.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesMixin.java
new file mode 100644
index 0000000..d6b4bd9
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesMixin.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery.partitions;
+
+import static org.apache.ignite.internal.cli.commands.Options.Constants.PLAIN_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.PLAIN_OPTION_DESC;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_NODE_NAMES_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_NODE_NAMES_OPTION_DESC;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_GLOBAL_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_GLOBAL_OPTION_DESC;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_IDS_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_IDS_OPTION_DESC;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_LOCAL_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_LOCAL_OPTION_DESC;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_ZONE_NAMES_OPTION;
+import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_ZONE_NAMES_OPTION_DESC;
+
+import java.util.List;
+import org.apache.ignite.internal.cli.commands.cluster.ClusterUrlMixin;
+import picocli.CommandLine.ArgGroup;
+import picocli.CommandLine.Mixin;
+import picocli.CommandLine.Option;
+
+/** Arguments for recovery partition states command. */
+public class PartitionStatesMixin {
+    /** Cluster endpoint URL option. */
+    @Mixin
+    private ClusterUrlMixin clusterUrl;
+
+    /** Specific local / global states filters. */
+    @ArgGroup(exclusive = true, multiplicity = "1")
+    private PartitionStatesArgGroup statesArgs;
+
+    /** IDs of partitions to get states of. */
+    @Option(names = RECOVERY_PARTITION_IDS_OPTION, description = RECOVERY_PARTITION_IDS_OPTION_DESC, split = ",")
+    private List<Integer> partitionIds;
+
+    /** Names of zones to get partition states of. */
+    @Option(names = RECOVERY_ZONE_NAMES_OPTION, description = RECOVERY_ZONE_NAMES_OPTION_DESC, split = ",")
+    private List<String> zoneNames;
+
+    /** Plain formatting of the table. */
+    @Option(names = PLAIN_OPTION, description = PLAIN_OPTION_DESC)
+    private boolean plain;
+
+    /** Return node names to get partition states from. */
+    public List<String> nodeNames() {
+        return statesArgs.localGroup() == null ? List.of() : statesArgs.localGroup().nodeNames();
+    }
+
+    /** If should return local partition states. */
+    public boolean local() {
+        return statesArgs.localGroup() != null;
+    }
+
+    public boolean plain() {
+        return plain;
+    }
+
+    public List<String> zoneNames() {
+        return zoneNames;
+    }
+
+    public List<Integer> partitionIds() {
+        return partitionIds;
+    }
+
+    public String clusterUrl() {
+        return clusterUrl.getClusterUrl();
+    }
+
+    static class PartitionStatesArgGroup {
+        @Option(names = RECOVERY_PARTITION_GLOBAL_OPTION, description = RECOVERY_PARTITION_GLOBAL_OPTION_DESC)
+        private boolean global;
+
+        @ArgGroup(exclusive = false)
+        private LocalGroup localGroup;
+
+        /** If global partition states should be returned. */
+        public boolean global() {
+            return global;
+        }
+
+        /** Returns arguments specific to local partition states. */
+        LocalGroup localGroup() {
+            return localGroup;
+        }
+
+        /** Arguments specific to local partition states. */
+        private static class LocalGroup {
+            @Option(required = true, names = RECOVERY_PARTITION_LOCAL_OPTION, description = RECOVERY_PARTITION_LOCAL_OPTION_DESC)
+            private boolean local;
+
+            @Option(names = RECOVERY_NODE_NAMES_OPTION, description = RECOVERY_NODE_NAMES_OPTION_DESC, split = ",")
+            private List<String> nodeNames;
+
+            /** Returns node names to get local partition states from. */
+            List<String> nodeNames() {
+                return nodeNames;
+            }
+
+            /** If local partition states should be returned. */
+            boolean local() {
+                return local;
+            }
+        }
+    }
+}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesReplCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesReplCommand.java
new file mode 100644
index 0000000..624d032
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/PartitionStatesReplCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands.recovery.partitions;
+
+import jakarta.inject.Inject;
+import org.apache.ignite.internal.cli.call.recovery.PartitionStatesCall;
+import org.apache.ignite.internal.cli.call.recovery.PartitionStatesCallInput;
+import org.apache.ignite.internal.cli.commands.BaseCommand;
+import org.apache.ignite.internal.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.internal.cli.core.flow.builder.Flows;
+import org.apache.ignite.internal.cli.decorators.TableDecorator;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Mixin;
+
+/** Command to get partition states. */
+@Command(name = "partition-states", description = "Returns partition states.")
+public class PartitionStatesReplCommand extends BaseCommand implements Runnable {
+    @Mixin
+    private PartitionStatesMixin options;
+
+    @Inject
+    private ConnectToClusterQuestion question;
+
+    @Inject
+    private PartitionStatesCall call;
+
+    @Override
+    public void run() {
+        question.askQuestionIfNotConnected(options.clusterUrl())
+                .map(url -> PartitionStatesCallInput.of(options, url))
+                .then(Flows.fromCall(call))
+                .print(new TableDecorator(options.plain()))
+                .verbose(verbose)
+                .start();
+    }
+
+}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h
index 4bdb6d0..4c0a079 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -207,7 +207,7 @@
     SYSTEM_CRITICAL_OPERATION_TIMEOUT = 0x130002,
 
     // DisasterRecovery group. Group code: 20
-    PARTITIONS_NOT_FOUND = 0x140001,
+    ILLEGAL_PARTITION_ID = 0x140001,
     NODES_NOT_FOUND = 0x140002,
     PARTITION_STATE = 0x140003
 };
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 7c420aa..e19562a 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -299,7 +299,7 @@
 
         // DisasterRecovery group. Group code: 20
         case error::code::NODES_NOT_FOUND:
-        case error::code::PARTITIONS_NOT_FOUND:
+        case error::code::ILLEGAL_PARTITION_ID:
         case error::code::PARTITION_STATE:
             return sql_state::SHY000_GENERAL_ERROR;
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index e96965a..b96e5a7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -579,8 +579,8 @@
             /// <summary> DisasterRecovery group name. </summary>
             public const String GroupName = "RECOVERY";
 
-            /// <summary> PartitionsNotFound error. </summary>
-            public const int PartitionsNotFound = (GroupCode << 16) | (1 & 0xFFFF);
+            /// <summary> IllegalPartitionId error. </summary>
+            public const int IllegalPartitionId = (GroupCode << 16) | (1 & 0xFFFF);
 
             /// <summary> NodesNotFound error. </summary>
             public const int NodesNotFound = (GroupCode << 16) | (2 & 0xFFFF);
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 1b1e63b..576acb3 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -39,7 +39,7 @@
 
     private static final Set<Integer> BAD_REQUEST_CODES = Set.of(
             DistributionZones.ZONE_NOT_FOUND_ERR,
-            DisasterRecovery.PARTITIONS_NOT_FOUND_ERR,
+            DisasterRecovery.ILLEGAL_PARTITION_ID_ERR,
             DisasterRecovery.NODES_NOT_FOUND_ERR
     );
 
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index e88c73b..1798a78 100644
--- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -128,13 +128,37 @@
     }
 
     @Test
-    void testLocalPartitionStatesPartitionNotFound() {
+    void testLocalPartitionStatesNegativePartition() {
         HttpClientResponseException thrown = assertThrows(
                 HttpClientResponseException.class,
-                () -> client.toBlocking().exchange("/state/local?partitionIds=-1", LocalPartitionStatesResponse.class)
+                () -> client.toBlocking().exchange("/state/local?partitionIds=0,1,-1,-10", LocalPartitionStatesResponse.class)
         );
 
         assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+        assertThat(thrown.getMessage(), containsString("Partition ID can't be negative, found: -10"));
+    }
+
+    @Test
+    void testLocalPartitionStatesPartitionOutOfRange() {
+        String zoneName = ZONES_CONTAINING_TABLES.stream().findAny().get();
+
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(
+                        String.format("/state/local?partitionIds=0,4,%d&zoneNames=%s", DEFAULT_PARTITION_COUNT, zoneName),
+                        LocalPartitionStatesResponse.class
+                )
+        );
+
+        assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+        assertThat(thrown.getMessage(), containsString(
+                        String.format(
+                                "Partition IDs should be in range [0, %d] for zone %s, found: %d",
+                                DEFAULT_PARTITION_COUNT - 1,
+                                zoneName,
+                                DEFAULT_PARTITION_COUNT
+                        )
+                ));
     }
 
     @Test
@@ -243,14 +267,38 @@
     }
 
     @Test
-    void testGlobalPartitionStatesPartitionNotFound() {
+    void testGlobalPartitionStatesIllegalPartitionNegative() {
         HttpClientResponseException thrown = assertThrows(
                 HttpClientResponseException.class,
-                () -> client.toBlocking().exchange("/state/global?partitionIds=-1", GlobalPartitionStatesResponse.class)
+                () -> client.toBlocking().exchange("/state/local?partitionIds=0,1,-1,-10", GlobalPartitionStatesResponse.class)
         );
 
         assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
-        assertThat(thrown.getMessage(), containsString("Some partitions are missing: [-1]"));
+        assertThat(thrown.getMessage(), containsString("Partition ID can't be negative, found: -10"));
+    }
+
+    @Test
+    void testGlobalPartitionStatesPartitionsOutOfRange() {
+        String zoneName = ZONES_CONTAINING_TABLES.stream().findAny().get();
+
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(
+                        String.format("/state/global?partitionIds=0,4,%d&zoneNames=%s", DEFAULT_PARTITION_COUNT, zoneName),
+                        GlobalPartitionStatesResponse.class
+                )
+        );
+
+
+        assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+        assertThat(thrown.getMessage(), containsString(
+                        String.format(
+                                "Partition IDs should be in range [0, %d] for zone %s, found: %d",
+                                DEFAULT_PARTITION_COUNT - 1,
+                                zoneName,
+                                DEFAULT_PARTITION_COUNT
+                        )
+                ));
     }
 
     @Test
@@ -286,6 +334,25 @@
         checkGlobalStates(response.body().states(), MIXED_CASE_ZONES);
     }
 
+    @Test
+    void testGlobalPartitionStatesByPartitions() {
+        Set<String> partitionIds = Set.of("1", "2");
+
+        String url = "state/global?partitionIds=" + String.join(",", partitionIds);
+
+        var response = client.toBlocking().exchange(url, GlobalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        List<GlobalPartitionStateResponse> states = response.body().states();
+
+        for (GlobalPartitionStateResponse state : states) {
+            assertTrue(partitionIds.contains((String.valueOf(state.partitionId()))));
+        }
+
+        checkGlobalStates(states, ZONES_CONTAINING_TABLES);
+    }
+
     private static void checkLocalStates(List<LocalPartitionStateResponse> states, Set<String> zoneNames, Set<String> nodes) {
         assertFalse(states.isEmpty());
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 6907623..436ccbb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -69,8 +69,8 @@
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
 import org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
-import org.apache.ignite.internal.table.distributed.disaster.exceptions.PartitionsNotFoundException;
 import org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
@@ -236,7 +236,7 @@
 
         return localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog)
                 .thenApply(res -> normalizeLocal(res, catalog))
-                .thenApply(res -> assembleGlobal(res, catalog));
+                .thenApply(res -> assembleGlobal(res, partitionIds, catalog));
     }
 
     private CompletableFuture<Map<TablePartitionId, LocalPartitionStateMessageByNode>> localPartitionStatesInternal(
@@ -245,10 +245,28 @@
             Set<Integer> partitionIds,
             Catalog catalog
     ) {
-        Set<Integer> zoneIds = getZoneIds(zoneNames, catalog);
+        Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames, catalog.zones());
+
+        if (!partitionIds.isEmpty()) {
+            int minPartition = partitionIds.stream().min(Integer::compare).get();
+
+            if (minPartition < 0) {
+                throw new IllegalPartitionIdException(minPartition);
+            }
+
+            int maxPartition = partitionIds.stream().max(Integer::compare).get();
+
+            zones.forEach(zone -> {
+                if (maxPartition >= zone.partitions()) {
+                    throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
+                }
+            });
+        }
 
         Set<NodeWithAttributes> nodes = getNodes(nodeNames);
 
+        Set<Integer> zoneIds = zones.stream().map(CatalogObjectDescriptor::id).collect(toSet());
+
         LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest()
                 .zoneIds(zoneIds)
                 .partitionIds(partitionIds)
@@ -290,14 +308,6 @@
                 throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
             }
 
-            if (!partitionIds.isEmpty()) {
-                Set<Integer> foundPartitionIds = result.keySet().stream()
-                        .map(TablePartitionId::partitionId)
-                        .collect(toSet());
-
-                checkPartitions(foundPartitionIds, partitionIds);
-            }
-
             return result;
         });
     }
@@ -324,12 +334,13 @@
         return nodes;
     }
 
-    private static Set<Integer> getZoneIds(Set<String> zoneNames, Catalog catalog) throws ZonesNotFoundException {
+    private static Collection<CatalogZoneDescriptor> filterZones(Set<String> zoneNames, Collection<CatalogZoneDescriptor> zones)
+            throws ZonesNotFoundException {
         if (zoneNames.isEmpty()) {
-            return Set.of();
+            return zones;
         }
 
-        List<CatalogZoneDescriptor> zoneDescriptors = catalog.zones().stream()
+        List<CatalogZoneDescriptor> zoneDescriptors = zones.stream()
                 .filter(catalogZoneDescriptor -> zoneNames.contains(catalogZoneDescriptor.name()))
                 .collect(toList());
 
@@ -343,9 +354,7 @@
             throw new ZonesNotFoundException(missingZoneNames);
         }
 
-        return zoneDescriptors.stream()
-                .map(CatalogObjectDescriptor::id)
-                .collect(toSet());
+        return zoneDescriptors;
     }
 
     /**
@@ -493,24 +502,6 @@
     }
 
     /**
-     * Checks that resulting states contain all partitions IDs from the request.
-     *
-     * @param foundPartitionIds Found partition IDs.
-     * @param requestedPartitionIds Requested partition IDs.
-     * @throws PartitionsNotFoundException if some IDs are missing.
-     */
-    private static void checkPartitions(
-            Set<Integer> foundPartitionIds,
-            Set<Integer> requestedPartitionIds
-    ) throws PartitionsNotFoundException {
-        if (!requestedPartitionIds.equals(foundPartitionIds)) {
-            Set<Integer> missingPartitionIds = CollectionUtils.difference(requestedPartitionIds, foundPartitionIds);
-
-            throw new PartitionsNotFoundException(missingPartitionIds);
-        }
-    }
-
-    /**
      * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP}, it can only be done once the state of all peers is
      * known.
      */
@@ -565,6 +556,7 @@
 
     private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal(
             Map<TablePartitionId, LocalPartitionStateByNode> localResult,
+            Set<Integer> partitionIds,
             Catalog catalog
     ) {
         Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream()
@@ -575,26 +567,47 @@
                     return assembleGlobalStateFromLocal(catalog, tablePartitionId, map);
                 }));
 
+        makeMissingPartitionsUnavailable(localResult, catalog, result, partitionIds);
+
+        return result;
+    }
+
+    private static void makeMissingPartitionsUnavailable(Map<TablePartitionId, LocalPartitionStateByNode> localResult, Catalog catalog,
+            Map<TablePartitionId, GlobalPartitionState> result, Set<Integer> partitionIds) {
         localResult.keySet().stream()
                 .map(TablePartitionId::tableId)
                 .distinct()
                 .forEach(tableId -> {
                     int zoneId = catalog.table(tableId).zoneId();
                     CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
-                    int partitions = zoneDescriptor.partitions();
 
-                    // Make missing partitions explicitly unavailable.
-                    for (int partitionId = 0; partitionId < partitions; partitionId++) {
-                        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
+                    if (partitionIds.isEmpty()) {
+                        int partitions = zoneDescriptor.partitions();
 
-                        result.computeIfAbsent(tablePartitionId, key ->
-                                new GlobalPartitionState(catalog.table(key.tableId()).name(), zoneDescriptor.name(),  key.partitionId(),
-                                        GlobalPartitionStateEnum.UNAVAILABLE)
-                        );
+                        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+                            putUnavailableStateIfAbsent(catalog, result, tableId, partitionId, zoneDescriptor);
+                        }
+                    } else {
+                        partitionIds.forEach(id -> {
+                            putUnavailableStateIfAbsent(catalog, result, tableId, id, zoneDescriptor);
+                        });
                     }
                 });
+    }
 
-        return result;
+    private static void putUnavailableStateIfAbsent(
+            Catalog catalog,
+            Map<TablePartitionId, GlobalPartitionState> states,
+            Integer tableId,
+            int partitionId,
+            CatalogZoneDescriptor zoneDescriptor
+    ) {
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
+
+        states.computeIfAbsent(tablePartitionId, key ->
+                new GlobalPartitionState(catalog.table(key.tableId()).name(), zoneDescriptor.name(), key.partitionId(),
+                        GlobalPartitionStateEnum.UNAVAILABLE)
+        );
     }
 
     private static GlobalPartitionState assembleGlobalStateFromLocal(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalPartitionIdException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalPartitionIdException.java
new file mode 100644
index 0000000..406c065
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalPartitionIdException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import static org.apache.ignite.lang.ErrorGroups.DisasterRecovery.ILLEGAL_PARTITION_ID_ERR;
+
+/** Exception is thrown when illegal partition was requested. */
+public class IllegalPartitionIdException extends DisasterRecoveryException {
+    private static final long serialVersionUID = -9215416423159317425L;
+
+    /** Creates exception that partition ID is negative. */
+    public IllegalPartitionIdException(int partitionId) {
+        super(ILLEGAL_PARTITION_ID_ERR, "Partition ID can't be negative, found: " + partitionId);
+    }
+
+    /** Creates exception that partition ID is bigger that partition count for zone. */
+    public IllegalPartitionIdException(int partitionId, int partitions, String zoneName) {
+        super(
+                ILLEGAL_PARTITION_ID_ERR,
+                String.format("Partition IDs should be in range [0, %d] for zone %s, found: %d", partitions - 1, zoneName, partitionId)
+        );
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java
deleted file mode 100644
index 29767db..0000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-
-import static org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITIONS_NOT_FOUND_ERR;
-
-import java.util.Set;
-
-/** Exception is thrown when appropriate partition can`t be found. */
-public class PartitionsNotFoundException extends DisasterRecoveryException {
-    private static final long serialVersionUID = -9215416423159317425L;
-
-    public PartitionsNotFoundException(Set<Integer> missingPartitionIds) {
-        super(PARTITIONS_NOT_FOUND_ERR, "Some partitions are missing: " + missingPartitionIds);
-    }
-}