IGNITE-21257 Introduce REST API for viewing partition states (#3614)

diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
new file mode 100644
index 0000000..4fa1b92
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")
+public interface DisasterRecoveryApi {
+    @Get("state/local")
+    @Operation(operationId = "getLocalPartitionStates", description = "Returns local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates();
+
+    @Get("state/local/{zoneName}")
+    @Operation(operationId = "getLocalPartitionStatesByZone", description = "Returns local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "404", description = "Zone is not found.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(@PathVariable("zoneName") String zoneName);
+
+    @Get("state/global")
+    @Operation(operationId = "getGlobalPartitionStates", description = "Returns global partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates();
+
+    @Get("state/global/{zoneName}")
+    @Operation(operationId = "getGlobalPartitionStatesByZone", description = "Returns global partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "404", description = "Zone is not found.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(@PathVariable("zoneName") String zoneName);
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
new file mode 100644
index 0000000..20dbbe9
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+
+/**
+ * Global partition state schema class.
+ */
+@Schema(description = "Information about global partition state.")
+public class GlobalPartitionStateResponse {
+    private final int partitionId;
+    private final String tableName;
+    private final String state;
+
+    /**
+     * Constructor.
+     */
+    @JsonCreator
+    public GlobalPartitionStateResponse(
+            @JsonProperty("partitionId") int partitionId,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("state") String state
+    ) {
+        this.partitionId = partitionId;
+        this.tableName = tableName;
+        this.state = state;
+    }
+
+    @JsonGetter("partitionId")
+    public int partitionId() {
+        return partitionId;
+    }
+
+    @JsonGetter("tableName")
+    public String tableName() {
+        return tableName;
+    }
+
+    @JsonGetter("state")
+    public String state() {
+        return state;
+    }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
new file mode 100644
index 0000000..5031abe
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+
+/**
+ * Global partition states schema class.
+ */
+@Schema(description = "Information about global partition states.")
+public class GlobalPartitionStatesResponse {
+    @Schema
+    private final List<GlobalPartitionStateResponse> states;
+
+    @JsonCreator
+    public GlobalPartitionStatesResponse(@JsonProperty("states") List<GlobalPartitionStateResponse> states) {
+        this.states = List.copyOf(states);
+    }
+
+    @JsonGetter("states")
+    public List<GlobalPartitionStateResponse> states() {
+        return states;
+    }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
new file mode 100644
index 0000000..c9d45b5
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+
+/**
+ * Local partition state schema class.
+ */
+@Schema(description = "Information about local partition state.")
+public class LocalPartitionStateResponse {
+    private final int partitionId;
+    private final String tableName;
+    private final String nodeName;
+    private final String state;
+
+    /**
+     * Constructor.
+     */
+    @JsonCreator
+    public LocalPartitionStateResponse(
+            @JsonProperty("partitionId") int partitionId,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("nodeName") String nodeName,
+            @JsonProperty("state") String state
+    ) {
+        this.partitionId = partitionId;
+        this.tableName = tableName;
+        this.nodeName = nodeName;
+        this.state = state;
+    }
+
+    @JsonGetter("partitionId")
+    public int partitionId() {
+        return partitionId;
+    }
+
+    @JsonGetter("tableName")
+    public String tableName() {
+        return tableName;
+    }
+
+    @JsonGetter("nodeName")
+    public String nodeName() {
+        return nodeName;
+    }
+
+    @JsonGetter("state")
+    public String state() {
+        return state;
+    }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
new file mode 100644
index 0000000..21c1174
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+
+/**
+ * Local partition states schema class.
+ */
+@Schema(description = "Information about local partition states.")
+public class LocalPartitionStatesResponse {
+    @Schema
+    private final List<LocalPartitionStateResponse> states;
+
+    @JsonCreator
+    public LocalPartitionStatesResponse(@JsonProperty("states") List<LocalPartitionStateResponse> states) {
+        this.states = List.copyOf(states);
+    }
+
+    @JsonGetter("states")
+    public List<LocalPartitionStateResponse> states() {
+        return states;
+    }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 6937a34..98e0d33 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -26,6 +26,7 @@
 import org.apache.ignite.internal.rest.api.Problem;
 import org.apache.ignite.internal.rest.constants.HttpCode;
 import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+import org.apache.ignite.lang.ErrorGroups.DistributionZones;
 
 /**
  * Handles {@link IgniteInternalException} and represents it as a rest response.
@@ -36,6 +37,15 @@
 
     @Override
     public HttpResponse<? extends Problem> handle(HttpRequest request, IgniteInternalException exception) {
+        if (exception.code() == DistributionZones.ZONE_NOT_FOUND_ERR) {
+            return HttpProblemResponse.from(
+                    Problem.fromHttpCode(HttpCode.NOT_FOUND)
+                            .detail(exception.getMessage())
+                            .traceId(exception.traceId())
+                            .code(exception.codeAsString())
+            );
+        }
+
         return HttpProblemResponse.from(
                 Problem.fromHttpCode(HttpCode.INTERNAL_SERVER_ERROR)
                         .traceId(exception.traceId())
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index a7592c8..da836c6 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -36,6 +36,7 @@
     implementation project(':ignite-network')
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-metrics')
+    implementation project(':ignite-table')
     implementation project(':ignite-code-deployment')
     implementation project(':ignite-security-api')
     implementation project(':ignite-compute')
@@ -72,6 +73,7 @@
     integrationTestImplementation project(':ignite-rest-api')
     integrationTestImplementation project(':ignite-network')
     integrationTestImplementation project(':ignite-api')
+    integrationTestImplementation project(':ignite-catalog')
     integrationTestImplementation project(':ignite-security')
     integrationTestImplementation project(':ignite-code-deployment')
     integrationTestImplementation project(':ignite-runner')
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
new file mode 100644
index 0000000..2d3a92a
--- /dev/null
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for disaster recovery REST commands.
+ */
+@MicronautTest
+public class ItDisasterRecoveryControllerTest extends ClusterPerTestIntegrationTest {
+    private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
+
+    @Inject
+    @Client(NODE_URL + "/management/v1/recovery/")
+    HttpClient client;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void testLocalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/local/", LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        LocalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/local/no-such-zone/", LocalPartitionStatesResponse.class)
+        );
+
+        assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+    }
+
+    @Test
+    void testLocalPartitionStatesByZone() {
+        executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'");
+
+        var response = client.toBlocking().exchange("/state/local/Default/", LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+        response = client.toBlocking().exchange("/state/local/FOO/", LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        List<LocalPartitionStateResponse> states = response.body().states();
+        assertEquals(1, states.size());
+
+        LocalPartitionStateResponse state = states.get(0);
+        assertEquals(0, state.partitionId());
+        assertEquals("idrct_tlpsbz_0", state.nodeName());
+        assertEquals("FOO", state.tableName());
+        assertEquals("HEALTHY", state.state());
+    }
+
+    @Test
+    void testGlobalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/global/", GlobalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        GlobalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = body.states().stream().map(GlobalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testGlobalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/global/no-such-zone/", GlobalPartitionStatesResponse.class)
+        );
+
+        assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+    }
+
+    @Test
+    void testGlobalPartitionStatesByZone() {
+        executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'");
+
+        var response = client.toBlocking().exchange("/state/global/Default/", GlobalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+        response = client.toBlocking().exchange("/state/global/FOO/", GlobalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        List<GlobalPartitionStateResponse> states = response.body().states();
+        assertEquals(1, states.size());
+
+        GlobalPartitionStateResponse state = states.get(0);
+        assertEquals(0, state.partitionId());
+        assertEquals("FOO", state.tableName());
+        assertEquals("AVAILABLE", state.state());
+    }
+}
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
new file mode 100644
index 0000000..4598ab9
--- /dev/null
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.Comparator.comparing;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Controller;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery/")
+@Requires(classes = IgniteInternalExceptionHandler.class)
+public class DisasterRecoveryController implements DisasterRecoveryApi {
+    private final DisasterRecoveryManager disasterRecoveryManager;
+
+    public DisasterRecoveryController(DisasterRecoveryManager disasterRecoveryManager) {
+        this.disasterRecoveryManager = disasterRecoveryManager;
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates() {
+        return disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(String zoneName) {
+        return disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates() {
+        return disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(String zoneName) {
+        return disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> localStates) {
+        List<LocalPartitionStateResponse> states = new ArrayList<>();
+
+        for (Map<String, LocalPartitionState> map : localStates.values()) {
+            for (Entry<String, LocalPartitionState> entry : map.entrySet()) {
+                String nodeName = entry.getKey();
+                LocalPartitionState state = entry.getValue();
+
+                states.add(new LocalPartitionStateResponse(
+                        state.partitionId,
+                        state.tableName,
+                        nodeName,
+                        state.state.name()
+                ));
+            }
+        }
+
+        // Sort the output conveniently.
+        states.sort(comparing(LocalPartitionStateResponse::tableName)
+                .thenComparingInt(LocalPartitionStateResponse::partitionId)
+                .thenComparing(LocalPartitionStateResponse::nodeName));
+
+        return new LocalPartitionStatesResponse(states);
+    }
+
+    private static GlobalPartitionStatesResponse convertGlobalStates(Map<TablePartitionId, GlobalPartitionState> globalStates) {
+        List<GlobalPartitionStateResponse> states = new ArrayList<>();
+
+        for (GlobalPartitionState state : globalStates.values()) {
+            states.add(new GlobalPartitionStateResponse(
+                    state.partitionId,
+                    state.tableName,
+                    state.state.name()
+            ));
+        }
+
+        // Sort the output conveniently.
+        states.sort(comparing(GlobalPartitionStateResponse::tableName)
+                .thenComparingInt(GlobalPartitionStateResponse::partitionId));
+
+        return new GlobalPartitionStatesResponse(states);
+    }
+}
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java
new file mode 100644
index 0000000..a5b3814
--- /dev/null
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import io.micronaut.context.annotation.Bean;
+import io.micronaut.context.annotation.Factory;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.rest.RestFactory;
+import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+
+/**
+ * Factory that defines beans required for the rest module.
+ */
+@Factory
+public class DisasterRecoveryFactory implements RestFactory {
+    private DisasterRecoveryManager disasterRecoveryManager;
+
+    public DisasterRecoveryFactory(DisasterRecoveryManager disasterRecoveryManager) {
+        this.disasterRecoveryManager = disasterRecoveryManager;
+    }
+
+    @Bean
+    @Singleton
+    public DisasterRecoveryManager disasterRecoveryManager() {
+        return disasterRecoveryManager;
+    }
+
+    @Override
+    public void cleanResources() {
+        disasterRecoveryManager = null;
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ce4fc2e..27dba1d 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -170,6 +170,7 @@
 import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory;
 import org.apache.ignite.internal.rest.metrics.MetricRestFactory;
 import org.apache.ignite.internal.rest.node.NodeManagementRestFactory;
+import org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
@@ -935,6 +936,7 @@
         Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager);
         Supplier<RestFactory> restManagerFactory = () -> new RestManagerFactory(restManager);
         Supplier<RestFactory> computeRestFactory = () -> new ComputeRestFactory(compute);
+        Supplier<RestFactory> disasterRecoveryFactory = () -> new DisasterRecoveryFactory(disasterRecoveryManager);
 
         RestConfiguration restConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY);
 
@@ -946,7 +948,8 @@
                         deploymentCodeRestFactory,
                         authProviderFactory,
                         restManagerFactory,
-                        computeRestFactory
+                        computeRestFactory,
+                        disasterRecoveryFactory
                 ),
                 restManager,
                 restConfiguration
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index d7a980c..b0d5028 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -60,7 +60,7 @@
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.KeyValueView;
@@ -326,7 +326,7 @@
     }
 
     private List<Integer> getRealAssignments(IgniteImpl node0, int partId) {
-        var partitionStatesFut = node0.disasterRecoveryManager().partitionStates(zoneName);
+        var partitionStatesFut = node0.disasterRecoveryManager().localPartitionStates(zoneName);
         assertThat(partitionStatesFut, willCompleteSuccessfully());
 
         Map<String, LocalPartitionState> partitionStates = partitionStatesFut.join().get(new TablePartitionId(tableId, partId));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 63cc832..3b1a1dc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -28,7 +28,7 @@
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
@@ -216,7 +216,7 @@
      * Messages for {@link DisasterRecoveryManager}.
      */
     interface DisasterRecoveryMessages {
-        /** Message type for {@link LocalPartitionState}. */
+        /** Message type for {@link LocalPartitionStateMessage}. */
         short LOCAL_PARTITION_STATE = 100;
 
         /** Message type for {@link LocalPartitionStatesRequest}. */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 98bb2ca..0961c1a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -17,6 +17,17 @@
 
 package org.apache.ignite.internal.table.distributed.disaster;
 
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.BROKEN;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.CATCHING_UP;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.HEALTHY;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INITIALIZING;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.ArrayList;
@@ -30,8 +41,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -39,6 +50,7 @@
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
 import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
 import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -52,7 +64,7 @@
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -76,9 +88,17 @@
 
     private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
 
-    private static final int TIMEOUT = 30;
+    /** Disaster recovery operations timeout in seconds. */
+    private static final int TIMEOUT_SECONDS = 30;
 
-    private static final int CATCH_UP_THRESHOLD = 10;
+    /**
+     * Maximal allowed difference between committed index on the leader and on the follower, that differentiates
+     * {@link LocalPartitionStateEnum#HEALTHY} from {@link LocalPartitionStateEnum#CATCHING_UP}.
+     */
+    private static final int CATCH_UP_THRESHOLD = 100;
+
+    /** Zone ID that corresponds to "all zones". */
+    private static final int NO_ZONE_ID = -1;
 
     /** Thread pool executor for async parts. */
     private final ExecutorService threadPool;
@@ -152,6 +172,10 @@
     @Override
     public void stop() throws Exception {
         metaStorageManager.unregisterWatch(watchListener);
+
+        for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
+            future.completeExceptionally(new NodeStoppingException());
+        }
     }
 
     /**
@@ -172,29 +196,58 @@
      * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping
      * between a node name and a partition state.
      *
-     * @param zoneName Zone name.
+     * @param zoneName Zone name. {@code null} means "all zones".
      * @return Future with the mapping.
      */
-    public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) {
-        int latestCatalogVersion = catalogManager.latestCatalogVersion();
-        Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream()
-                .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName))
-                .findAny();
+    public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) {
+        Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
 
-        if (zoneDesciptorOptional.isEmpty()) {
-            return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null));
+        return localPartitionStatesInternal(zoneName, catalog)
+                .thenApply(res -> normalizeLocal(res, catalog));
+    }
+
+    /**
+     * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the global
+     * partition state enum value.
+     *
+     * @param zoneName Zone name. {@code null} means "all zones".
+     * @return Future with the mapping.
+     */
+    public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> globalPartitionStates(@Nullable String zoneName) {
+        Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        return localPartitionStatesInternal(zoneName, catalog)
+                .thenApply(res -> normalizeLocal(res, catalog))
+                .thenApply(res -> assembleGlobal(res, catalog));
+    }
+
+    private CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionStateMessage>>> localPartitionStatesInternal(
+            @Nullable String zoneName, Catalog catalog
+    ) {
+        int zoneId;
+        if (zoneName == null) {
+            zoneId = NO_ZONE_ID;
+        } else {
+            Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalog.zones().stream()
+                    .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName))
+                    .findAny();
+
+            if (zoneDesciptorOptional.isEmpty()) {
+                return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null));
+            }
+
+            CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
+            zoneId = zoneDescriptor.id();
         }
 
-        CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
-
         Set<NodeWithAttributes> logicalTopology = dzManager.logicalTopology();
 
         LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest()
-                .zoneId(zoneDescriptor.id())
-                .catalogVersion(latestCatalogVersion)
+                .zoneId(zoneId)
+                .catalogVersion(catalog.version())
                 .build();
 
-        Map<TablePartitionId, Map<String, LocalPartitionState>> result = new ConcurrentHashMap<>();
+        Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result = new ConcurrentHashMap<>();
         CompletableFuture<?>[] futures = new CompletableFuture[logicalTopology.size()];
 
         int i = 0;
@@ -202,7 +255,7 @@
             CompletableFuture<NetworkMessage> invokeFuture = messagingService.invoke(
                     node.nodeName(),
                     localPartitionStatesRequest,
-                    TimeUnit.SECONDS.toMillis(TIMEOUT)
+                    TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
             );
 
             futures[i++] = invokeFuture.thenAccept(networkMessage -> {
@@ -210,7 +263,7 @@
 
                 var response = (LocalPartitionStatesResponse) networkMessage;
 
-                for (LocalPartitionState state : response.states()) {
+                for (LocalPartitionStateMessage state : response.states()) {
                     result.compute(state.partitionId().asTablePartitionId(), (tablePartitionId, map) -> {
                         if (map == null) {
                             return Map.of(node.nodeName(), state);
@@ -224,7 +277,7 @@
             });
         }
 
-        return CompletableFuture.allOf(futures).handle((unused, throwable) -> normalize(result));
+        return CompletableFuture.allOf(futures).handle((unused, throwable) -> result);
     }
 
     /**
@@ -238,7 +291,7 @@
 
         CompletableFuture<Void> operationFuture = new CompletableFuture<Void>()
                 .whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId))
-                .orTimeout(TIMEOUT, TimeUnit.SECONDS);
+                .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
         ongoingOperationsById.put(operationId, operationFuture);
 
@@ -287,14 +340,15 @@
 
         int catalogVersion = request.catalogVersion();
         catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
-            List<LocalPartitionState> statesList = new ArrayList<>();
+            List<LocalPartitionStateMessage> statesList = new ArrayList<>();
 
             raftManager.forEach((raftNodeId, raftGroupService) -> {
                 if (raftNodeId.groupId() instanceof TablePartitionId) {
                     var tablePartitionId = (TablePartitionId) raftNodeId.groupId();
 
                     CatalogTableDescriptor tableDescriptor = catalogManager.table(tablePartitionId.tableId(), catalogVersion);
-                    if (tableDescriptor == null || tableDescriptor.zoneId() != request.zoneId()) {
+                    // Only tables that belong to a specific catalog version will be returned.
+                    if (tableDescriptor == null || request.zoneId() != NO_ZONE_ID && tableDescriptor.zoneId() != request.zoneId()) {
                         return;
                     }
 
@@ -304,18 +358,18 @@
                     LocalPartitionStateEnum localState = convertState(nodeState);
                     long lastLogIndex = raftNode.lastLogIndex();
 
-                    if (localState == LocalPartitionStateEnum.HEALTHY) {
+                    if (localState == HEALTHY) {
                         // Node without log didn't process anything yet, it's not really "healthy" before it accepts leader's configuration.
                         if (lastLogIndex == 0) {
-                            localState = LocalPartitionStateEnum.INITIALIZING;
+                            localState = INITIALIZING;
                         }
 
                         if (raftNode.isInstallingSnapshot()) {
-                            localState = LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
+                            localState = INSTALLING_SNAPSHOT;
                         }
                     }
 
-                    statesList.add(MSG_FACTORY.localPartitionState()
+                    statesList.add(MSG_FACTORY.localPartitionStateMessage()
                             .partitionId(MSG_FACTORY.tablePartitionIdMessage()
                                     .tableId(tablePartitionId.tableId())
                                     .partitionId(tablePartitionId.partitionId())
@@ -342,13 +396,13 @@
             case STATE_TRANSFERRING:
             case STATE_CANDIDATE:
             case STATE_FOLLOWER:
-                return LocalPartitionStateEnum.HEALTHY;
+                return HEALTHY;
 
             case STATE_ERROR:
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
 
             case STATE_UNINITIALIZED:
-                return LocalPartitionStateEnum.INITIALIZING;
+                return INITIALIZING;
 
             case STATE_SHUTTING:
             case STATE_SHUTDOWN:
@@ -357,36 +411,105 @@
 
             default:
                 // Unrecognized state, better safe than sorry.
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
         }
     }
 
     /**
-     * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is
+     * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP}, it can only be done once the state of all peers is
      * known.
      */
-    private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
-            Map<TablePartitionId, Map<String, LocalPartitionState>> result
+    private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal(
+            Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result,
+            Catalog catalog
     ) {
-        return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
-            Map<String, LocalPartitionState> map = entry.getValue();
+        return result.entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> {
+            TablePartitionId tablePartitionId = entry.getKey();
+            Map<String, LocalPartitionStateMessage> map = entry.getValue();
 
             // noinspection OptionalGetWithoutIsPresent
-            long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+            long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
 
-            return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {
-                LocalPartitionState state = entry2.getValue();
+            return map.entrySet().stream().collect(toMap(Map.Entry::getKey, entry2 -> {
+                LocalPartitionStateMessage stateMsg = entry2.getValue();
 
-                if (state.state() != LocalPartitionStateEnum.HEALTHY || maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) {
-                    return state;
+                LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+                if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
+                    stateEnum = CATCHING_UP;
                 }
 
-                return MSG_FACTORY.localPartitionState()
-                        .state(LocalPartitionStateEnum.CATCHING_UP)
-                        .partitionId(state.partitionId())
-                        .logIndex(state.logIndex())
-                        .build();
+                // Tables, returned from local states request, are always present in the required version of the catalog.
+                CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
+                return new LocalPartitionState(tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum);
             }));
         }));
     }
+
+    private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal(
+            Map<TablePartitionId, Map<String, LocalPartitionState>> localResult,
+            Catalog catalog
+    ) {
+        Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream()
+                .collect(toMap(Map.Entry::getKey, entry -> {
+                    TablePartitionId tablePartitionId = entry.getKey();
+                    Map<String, LocalPartitionState> map = entry.getValue();
+
+                    return assembleGlobalStateFromLocal(catalog, tablePartitionId, map);
+                }));
+
+        localResult.keySet().stream()
+                .map(TablePartitionId::tableId)
+                .distinct()
+                .forEach(tableId -> {
+                    int zoneId = catalog.table(tableId).zoneId();
+                    CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+                    int partitions = zoneDescriptor.partitions();
+
+                    // Make missing partitions explicitly unavailable.
+                    for (int partitionId = 0; partitionId < partitions; partitionId++) {
+                        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
+
+                        result.computeIfAbsent(tablePartitionId, key ->
+                                new GlobalPartitionState(catalog.table(key.tableId()).name(), key.partitionId(),
+                                        GlobalPartitionStateEnum.UNAVAILABLE)
+                        );
+                    }
+                });
+
+        return result;
+    }
+
+    private static GlobalPartitionState assembleGlobalStateFromLocal(
+            Catalog catalog,
+            TablePartitionId tablePartitionId,
+            Map<String, LocalPartitionState> map
+    ) {
+        // Tables, returned from local states request, are always present in the required version of the catalog.
+        int zoneId = catalog.table(tablePartitionId.tableId()).zoneId();
+        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+        int replicas = zoneDescriptor.replicas();
+        int quorum = replicas / 2 + 1;
+
+        Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream()
+                .collect(groupingBy(localPartitionState -> localPartitionState.state));
+
+        GlobalPartitionStateEnum globalStateEnum;
+
+        int healthyReplicas = groupedStates.getOrDefault(HEALTHY, emptyList()).size();
+
+        if (healthyReplicas == replicas) {
+            globalStateEnum = AVAILABLE;
+        } else if (healthyReplicas >= quorum) {
+            globalStateEnum = DEGRADED;
+        } else if (healthyReplicas > 0) {
+            globalStateEnum = READ_ONLY;
+        } else {
+            globalStateEnum = GlobalPartitionStateEnum.UNAVAILABLE;
+        }
+
+        LocalPartitionState anyLocalState = map.values().iterator().next();
+        return new GlobalPartitionState(anyLocalState.tableName, tablePartitionId.partitionId(), globalStateEnum);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
new file mode 100644
index 0000000..816bfc6
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Global partition state.
+ */
+public class GlobalPartitionState {
+    public final String tableName;
+
+    public final int partitionId;
+
+    public final GlobalPartitionStateEnum state;
+
+    GlobalPartitionState(String tableName, int partitionId, GlobalPartitionStateEnum state) {
+        this.tableName = tableName;
+        this.partitionId = partitionId;
+        this.state = state;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java
new file mode 100644
index 0000000..f2b6be6
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Enum for states of partitions.
+ */
+public enum GlobalPartitionStateEnum {
+    /** All replicas are healthy. */
+    AVAILABLE,
+
+    /** There are healthy replicas, and they form a majority. */
+    DEGRADED,
+
+    /** There are healthy replicas, but they don't form a majority. */
+    READ_ONLY,
+
+    /** There are no healthy replicas. */
+    UNAVAILABLE
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
new file mode 100644
index 0000000..0db3f39
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Local partition state.
+ */
+public class LocalPartitionState {
+    public final String tableName;
+
+    public final int partitionId;
+
+    public final LocalPartitionStateEnum state;
+
+    LocalPartitionState(String tableName, int partitionId, LocalPartitionStateEnum state) {
+        this.tableName = tableName;
+        this.partitionId = partitionId;
+        this.state = state;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
similarity index 95%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
index 8f9971d..49e9e7c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
@@ -28,7 +28,7 @@
  * Local partition state message, has partition ID, state and last committed log index.
  */
 @Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE)
-public interface LocalPartitionState extends NetworkMessage {
+public interface LocalPartitionStateMessage extends NetworkMessage {
     /** Partition ID. */
     TablePartitionIdMessage partitionId();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
index b1ba925..e55f978 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
@@ -27,5 +27,5 @@
  */
 @Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
 public interface LocalPartitionStatesResponse extends NetworkMessage {
-    List<LocalPartitionState> states();
+    List<LocalPartitionStateMessage> states();
 }