[IGNITE-21295] Implement REST API for manual raft group configuration update (#3701)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
index 24763d0..2559ae1 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
@@ -38,6 +38,15 @@
     }
 
     /**
+     * Creates an exception with the given fully-qualified table name.
+     *
+     * @param tableName Fully-qualified table name.
+     */
+    public TableNotFoundException(String tableName) {
+        super(TABLE_NOT_FOUND_ERR, "The table does not exist [name=" + tableName + ']');
+    }
+
+    /**
      * Creates an exception with the given trace ID, error code, detailed message, and cause.
      *
      * @param traceId Unique identifier of the exception.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index 301a451..f35261e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -20,6 +20,9 @@
 import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.unmodifiable;
 import static java.util.Collections.unmodifiableList;
 import static java.util.Comparator.comparingInt;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toUnmodifiableMap;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
@@ -28,12 +31,11 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
 import java.util.stream.Collector;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -49,12 +51,12 @@
  */
 public class Catalog {
     private static <T extends CatalogObjectDescriptor> Collector<T, ?, Map<String, T>> toMapByName() {
-        return Collectors.toUnmodifiableMap(CatalogObjectDescriptor::name, Function.identity());
+        return toUnmodifiableMap(CatalogObjectDescriptor::name, identity());
     }
 
     private static <T extends CatalogObjectDescriptor> Collector<T, ?, Int2ObjectMap<T>> toMapById() {
-        return Collectors.collectingAndThen(
-                CollectionUtils.toIntMapCollector(CatalogObjectDescriptor::id, Function.identity()),
+        return collectingAndThen(
+                CollectionUtils.toIntMapCollector(CatalogObjectDescriptor::id, identity()),
                 Int2ObjectMaps::unmodifiable
         );
     }
@@ -64,6 +66,7 @@
     private final long activationTimestamp;
     private final Map<String, CatalogSchemaDescriptor> schemasByName;
     private final Map<String, CatalogZoneDescriptor> zonesByName;
+    private final Map<String, CatalogTableDescriptor> tablesByName;
     private final @Nullable CatalogZoneDescriptor defaultZone;
 
     @IgniteToStringExclude
@@ -110,6 +113,13 @@
         schemasByName = schemas.stream().collect(toMapByName());
         zonesByName = zones.stream().collect(toMapByName());
 
+        tablesByName = new HashMap<>();
+        for (CatalogSchemaDescriptor schema : schemas) {
+            for (CatalogTableDescriptor table : schema.tables()) {
+                tablesByName.put(schema.name() + "." + table.name(), table);
+            }
+        }
+
         schemasById = schemas.stream().collect(toMapById());
         tablesById = schemas.stream().flatMap(s -> Arrays.stream(s.tables())).collect(toMapById());
         indexesById = schemas.stream().flatMap(s -> Arrays.stream(s.indexes())).collect(toMapById());
@@ -155,6 +165,15 @@
         return tablesById.get(tableId);
     }
 
+    /**
+     * Returns table descriptor by fully-qualified table name.
+     *
+     * @param tableName Fully-qualified table name. Case-sensitive, without quotes.
+     * */
+    public @Nullable CatalogTableDescriptor table(String tableName) {
+        return tablesByName.get(tableName);
+    }
+
     public Collection<CatalogTableDescriptor> tables() {
         return tablesById.values();
     }
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
index 59750bd..1137538 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
@@ -42,6 +42,15 @@
      * The constructor.
      *
      * @param zoneName Zone name.
+     */
+    public DistributionZoneNotFoundException(String zoneName) {
+        super(ZONE_NOT_FOUND_ERR, "Distribution zone is not found [zoneName=" + zoneName + ']');
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param zoneName Zone name.
      * @param cause Optional nested exception (can be {@code null}).
      */
     public DistributionZoneNotFoundException(String zoneName, @Nullable Throwable cause) {
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index bf97314..6fdec03 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -17,7 +17,8 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
-import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -36,7 +37,6 @@
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -44,7 +44,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
@@ -282,9 +281,10 @@
             long storageRevision,
             MetaStorageManager metaStorageManager
     ) {
-        CompletableFuture<List<Assignments>> tableAssignmentsFut = tableAssignments(
+        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
                 metaStorageManager,
                 tableDescriptor.id(),
+                Set.of(),
                 zoneDescriptor.partitions()
         );
 
@@ -321,6 +321,7 @@
      *
      * @param tableDescriptor Table descriptor.
      * @param zoneDescriptor Zone descriptor.
+     * @param partitionIds Partitions IDs to force assignments for. If empty, reassigns all zone's partitions.
      * @param dataNodes Current DZ data nodes.
      * @param aliveNodesConsistentIds Set of alive nodes according to logical topology.
      * @param revision Meta-storage revision to be associated with reassignment.
@@ -330,25 +331,31 @@
     public static CompletableFuture<?>[] forceAssignmentsUpdate(
             CatalogTableDescriptor tableDescriptor,
             CatalogZoneDescriptor zoneDescriptor,
+            Set<Integer> partitionIds,
             Set<String> dataNodes,
             Set<String> aliveNodesConsistentIds,
             long revision,
             MetaStorageManager metaStorageManager
     ) {
-        CompletableFuture<List<Assignments>> tableAssignmentsFut = tableAssignments(
+        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
                 metaStorageManager,
                 tableDescriptor.id(),
+                partitionIds,
                 zoneDescriptor.partitions()
         );
 
-        CompletableFuture<?>[] futures = new CompletableFuture[zoneDescriptor.partitions()];
-
         Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, aliveNodesConsistentIds);
 
-        for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
-            TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), partId);
+        int[] ids = partitionIds.isEmpty()
+                ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
+                : partitionIds.stream().mapToInt(Integer::intValue).toArray();
 
-            futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments ->
+        CompletableFuture<?>[] futures = new CompletableFuture[ids.length];
+
+        for (int i = 0; i < ids.length; i++) {
+            TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), ids[i]);
+
+            futures[i] = tableAssignmentsFut.thenCompose(tableAssignments ->
                     tableAssignments.isEmpty() ? nullCompletedFuture() : manualPartitionUpdate(
                             replicaGrpId,
                             aliveDataNodes,
@@ -620,7 +627,7 @@
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
-        return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(Collectors.toSet());
+        return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(toSet());
     }
 
     /**
@@ -646,7 +653,7 @@
      * @return Result of the intersection.
      */
     public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
-        return op1.stream().filter(op2::contains).collect(Collectors.toSet());
+        return op1.stream().filter(op2::contains).collect(toSet());
     }
 
     /**
@@ -689,47 +696,53 @@
     }
 
     /**
-     * Returns table assignments for all table partitions from meta storage.
+     * Returns table assignments for table partitions from meta storage.
      *
      * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
-     * @param numberOfPartitions Number of partitions.
+     * @param partitionIds IDs of partitions to get assignments for. If empty, get all partition assignments.
+     * @param numberOfPartitions Number of partitions. Ignored if partition IDs are specified.
      * @return Future with table assignments as a value.
      */
-    static CompletableFuture<List<Assignments>> tableAssignments(
+    static CompletableFuture<Map<Integer, Assignments>> tableAssignments(
             MetaStorageManager metaStorageManager,
             int tableId,
+            Set<Integer> partitionIds,
             int numberOfPartitions
     ) {
         Map<ByteArray, Integer> partitionKeysToPartitionNumber = new HashMap<>();
 
-        for (int i = 0; i < numberOfPartitions; i++) {
-            partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new TablePartitionId(tableId, i)), i);
+        Collection<Integer> ids = partitionIds.isEmpty()
+                ? IntStream.range(0, numberOfPartitions).boxed().collect(toList())
+                : partitionIds;
+
+        for (Integer partId : ids) {
+            partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new TablePartitionId(tableId, partId)), partId);
         }
 
         return metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
                 .thenApply(entries -> {
                     if (entries.isEmpty()) {
-                        return emptyList();
+                        return Map.of();
                     }
 
-                    Assignments[] result = new Assignments[numberOfPartitions];
+                    Map<Integer, Assignments> result = new HashMap<>();
                     int numberOfMsPartitions = 0;
 
                     for (var mapEntry : entries.entrySet()) {
                         Entry entry = mapEntry.getValue();
 
                         if (!entry.empty() && !entry.tombstone()) {
-                            result[partitionKeysToPartitionNumber.get(mapEntry.getKey())] = Assignments.fromBytes(entry.value());
+                            result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), Assignments.fromBytes(entry.value()));
                             numberOfMsPartitions++;
                         }
                     }
 
-                    assert numberOfMsPartitions == 0 || numberOfMsPartitions == numberOfPartitions
+                    assert numberOfMsPartitions == 0 || numberOfMsPartitions == entries.size()
                             : "Invalid number of stable partition entries received from meta storage [received="
-                            + numberOfMsPartitions + ", numberOfPartitions=" + numberOfPartitions + ", tableId=" + tableId + "].";
+                            + numberOfMsPartitions + ", numberOfPartitions=" + entries.size() + ", tableId=" + tableId + "].";
 
-                    return numberOfMsPartitions == 0 ? emptyList() : Arrays.asList(result);
+                    return numberOfMsPartitions == 0 ? Map.of() : result;
                 });
     }
 
@@ -756,6 +769,6 @@
 
                     return Assignments.fromBytes(e.value());
                 })
-                .collect(Collectors.toList());
+                .collect(toList());
     }
 }
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
index 78be16d..4cc26d2 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.rest.api.recovery;
 
+import io.micronaut.http.annotation.Body;
 import io.micronaut.http.annotation.Controller;
 import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.Post;
 import io.micronaut.http.annotation.Produces;
 import io.micronaut.http.annotation.QueryValue;
 import io.swagger.v3.oas.annotations.Operation;
@@ -73,4 +75,18 @@
             @Schema(description = "IDs of partitions to get states of. All partitions if empty.")
             Optional<Set<Integer>> partitionIds
     );
+
+    @Post("reset-partitions")
+    @Operation(
+            operationId = "reset-partitions",
+            description = "Updates assignments of partitions in a forced manner, allowing for the recovery of raft groups with "
+                    + "lost majorities."
+    )
+    @ApiResponse(responseCode = "200", description = "Partition states reset.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Bad request.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<Void> resetPartitions(@Body ResetPartitionsRequest command);
 }
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java
new file mode 100644
index 0000000..d8a45ac
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/** Disaster recovery request to reset partitions. */
+@Schema(description = "Reset partitions configuration.")
+public class ResetPartitionsRequest {
+    @Schema(description = "Name of the zone to reset partitions of. Without quotes, case-sensitive.")
+    private final String zoneName;
+
+    @Schema(description = "IDs of partitions to reset. All if empty.")
+    private final Set<Integer> partitionIds;
+
+    @Schema(description = "Fully-qualified name of the table to reset partitions of. Without quotes, case-sensitive.")
+    private final String tableName;
+
+    /** Constructor. */
+    @JsonCreator
+    public ResetPartitionsRequest(
+            @JsonProperty("zoneName") String zoneName,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("partitionIds") @Nullable Collection<Integer> partitionIds
+    ) {
+        Objects.requireNonNull(zoneName);
+        Objects.requireNonNull(tableName);
+
+        this.zoneName = zoneName;
+        this.tableName = tableName;
+        this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds);
+    }
+
+    /** Returns ids of partitions to reset. */
+    @JsonGetter("partitionIds")
+    public Set<Integer> partitionIds() {
+        return partitionIds;
+    }
+
+    /** Returns name of the zone to reset partitions of. */
+    @JsonGetter("zoneName")
+    public String zoneName() {
+        return zoneName;
+    }
+
+    /** Returns name of the table to reset partitions of. */
+    @JsonGetter("tableName")
+    public String tableName() {
+        return tableName;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
index a9e6b47..0488d32 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
@@ -23,6 +23,7 @@
 import io.micronaut.http.server.exceptions.ExceptionHandler;
 import jakarta.inject.Singleton;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.validation.ConfigurationValidationException;
 import org.apache.ignite.internal.rest.api.InvalidParam;
@@ -30,6 +31,7 @@
 import org.apache.ignite.internal.rest.constants.HttpCode;
 import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
 import org.apache.ignite.lang.ErrorGroup;
+import org.apache.ignite.lang.ErrorGroups.Table;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
 
@@ -39,19 +41,12 @@
 @Singleton
 @Requires(classes = {IgniteException.class, ExceptionHandler.class})
 public class IgniteExceptionHandler implements ExceptionHandler<IgniteException, HttpResponse<? extends Problem>> {
+    private static final Set<Integer> BAD_REQUEST_CODES = Set.of(Table.TABLE_NOT_FOUND_ERR);
+
     @Override
     public HttpResponse<? extends Problem> handle(HttpRequest request, IgniteException exception) {
         String detail = extractDetailMessageOrNull(exception);
 
-        if (exception.getCause() instanceof IllegalArgumentException) {
-            return HttpProblemResponse.from(
-                    Problem.fromHttpCode(HttpCode.BAD_REQUEST)
-                            .detail(detail)
-                            .traceId(exception.traceId())
-                            .code(exception.codeAsString())
-            );
-        }
-
         if (exception.getCause() instanceof ConfigurationValidationException) {
             return HttpProblemResponse.from(
                     Problem.fromHttpCode(HttpCode.BAD_REQUEST)
@@ -62,6 +57,15 @@
             );
         }
 
+        if (exception.getCause() instanceof IllegalArgumentException || BAD_REQUEST_CODES.contains(exception.code())) {
+            return HttpProblemResponse.from(
+                    Problem.fromHttpCode(HttpCode.BAD_REQUEST)
+                            .detail(detail)
+                            .traceId(exception.traceId())
+                            .code(exception.codeAsString())
+            );
+        }
+
         return HttpProblemResponse.from(
                 Problem.fromHttpCode(HttpCode.INTERNAL_SERVER_ERROR)
                         .detail(detail)
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java
new file mode 100644
index 0000000..681af5d
--- /dev/null
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.MutableHttpRequest;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.Set;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Test for disaster recovery reset partitions command, positive cases. */
+@MicronautTest
+public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerTestIntegrationTest {
+    private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
+
+    private static final String FIRST_ZONE = "first_ZONE";
+
+    private static final String TABLE_NAME = "first_ZONE_table";
+
+    private static final String QUALIFIED_TABLE_NAME = "PUBLIC." + TABLE_NAME;
+
+    @Inject
+    @Client(NODE_URL + "/management/v1/recovery/")
+    HttpClient client;
+
+    @BeforeEach
+    public void setUp() {
+        executeSql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+        executeSql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%s'", TABLE_NAME,
+                FIRST_ZONE));
+    }
+
+    @Test
+    public void testResetAllPartitions() {
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of()));
+
+        HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+        assertThat(response.getStatus().getCode(), is(OK.code()));
+    }
+
+    @Test
+    public void testResetSpecifiedPartitions() {
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0)));
+
+        HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+        assertThat(response.getStatus().getCode(), is(OK.code()));
+    }
+}
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index 1798a78..fe6cbe6 100644
--- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -22,16 +22,20 @@
 import static java.util.stream.IntStream.range;
 import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import io.micronaut.http.HttpRequest;
 import io.micronaut.http.HttpResponse;
 import io.micronaut.http.HttpStatus;
+import io.micronaut.http.MutableHttpRequest;
 import io.micronaut.http.client.HttpClient;
 import io.micronaut.http.client.annotation.Client;
 import io.micronaut.http.client.exceptions.HttpClientResponseException;
@@ -47,6 +51,7 @@
 import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
 import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
 import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -58,7 +63,11 @@
 public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegrationTest {
     private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
 
-    private static final Set<String> ZONES = Set.of("first_ZONE", "second_ZONE", "third_ZONE");
+    private static final String FIRST_ZONE = "first_ZONE";
+    private static final String QUALIFIED_TABLE_NAME = "PUBLIC.first_ZONE_table";
+
+    private static final Set<String> ZONES = Set.of(FIRST_ZONE, "second_ZONE", "third_ZONE");
+
 
     private static final Set<String> MIXED_CASE_ZONES = Set.of("mixed_first_zone", "MIXED_FIRST_ZONE", "mixed_second_zone",
             "MIXED_SECOND_ZONE");
@@ -81,7 +90,7 @@
     public static void setUp() {
         ZONES_CONTAINING_TABLES.forEach(name -> {
             sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", name, DEFAULT_AIPERSIST_PROFILE_NAME));
-            sql(String.format("CREATE TABLE \"%s_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%1$s'", name));
+            sql(String.format("CREATE TABLE PUBLIC.\"%s_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%1$s'", name));
         });
 
         sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", EMPTY_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
@@ -353,6 +362,68 @@
         checkGlobalStates(states, ZONES_CONTAINING_TABLES);
     }
 
+    @Test
+    public void testResetPartitionZoneNotFound() {
+        String unknownZone = "unknown_zone";
+
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(unknownZone, QUALIFIED_TABLE_NAME, Set.of()));
+
+        HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+        assertThat(e.getMessage(), containsString("Distribution zone is not found [zoneName=" + unknownZone + "]"));
+    }
+
+    @Test
+    public void testResetPartitionTableNotFound() {
+        String tableName = "unknown_table";
+
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(FIRST_ZONE, tableName, Set.of()));
+
+        HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+        assertThat(e.getMessage(), containsString("The table does not exist [name=" + tableName + "]"));
+    }
+
+    @Test
+    void testResetPartitionsIllegalPartitionNegative() {
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10)));
+
+        HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+        assertThat(e.getMessage(), containsString("Partition ID can't be negative, found: -10"));
+    }
+
+    @Test
+    void testResetPartitionsPartitionsOutOfRange() {
+        MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+                new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT)));
+
+        HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+        assertThat(e.getMessage(), containsString(
+                String.format(
+                        "Partition IDs should be in range [0, %d] for zone %s, found: %d",
+                        DEFAULT_PARTITION_COUNT - 1,
+                        FIRST_ZONE,
+                        DEFAULT_PARTITION_COUNT
+                )
+        ));
+    }
+
     private static void checkLocalStates(List<LocalPartitionStateResponse> states, Set<String> zoneNames, Set<String> nodes) {
         assertFalse(states.isEmpty());
 
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index 182a0a5..65b1f50 100644
--- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -20,6 +20,7 @@
 import static java.util.Comparator.comparing;
 
 import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Body;
 import io.micronaut.http.annotation.Controller;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,6 +35,7 @@
 import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
 import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
 import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
 import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
 import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
 import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
@@ -78,6 +80,15 @@
                 .thenApply(DisasterRecoveryController::convertGlobalStates);
     }
 
+    @Override
+    public CompletableFuture<Void> resetPartitions(@Body ResetPartitionsRequest command) {
+        return disasterRecoveryManager.resetPartitions(
+                command.zoneName(),
+                command.tableName(),
+                command.partitionIds()
+        );
+    }
+
     private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, LocalPartitionStateByNode> localStates) {
         List<LocalPartitionStateResponse> states = new ArrayList<>();
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f128f99..341658c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -67,6 +67,7 @@
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -79,7 +80,9 @@
     private static final int SCALE_DOWN_TIMEOUT_SECONDS = 2;
 
     /** Test table name. */
-    private static final String TABLE_NAME = "test";
+    private static final String TABLE_NAME = "TEST";
+
+    private static final String QUALIFIED_TABLE_NAME = "PUBLIC.TEST";
 
     private static final int ENTRIES = 2;
 
@@ -175,8 +178,8 @@
     }
 
     /**
-     * Tests that a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a disaster
-     * recovery API. In this test, assignments will be (0, 3, 4)
+     * Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a
+     * disaster recovery API. In this test, assignments will be (0, 3, 4), according to {@link RendezvousAffinityFunction}.
      */
     @Test
     @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
@@ -194,7 +197,12 @@
 
         waitForScale(node0, 3);
 
-        CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(zoneId, tableId);
+        CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                Set.of()
+        );
+
         assertThat(updateFuture, willCompleteSuccessfully());
 
         awaitPrimaryReplica(node0, partId);
@@ -206,6 +214,55 @@
     }
 
     /**
+     * Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover specified partition
+     * using a disaster recovery API. In this test, assignments will be (0, 2, 4) and (1, 2, 4), according to
+     * {@link RendezvousAffinityFunction}.
+     */
+    @Test
+    @ZoneParams(nodes = 5, replicas = 3, partitions = 2)
+    void testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
+        int fixingPartId = 1;
+        int anotherPartId = 0;
+
+        IgniteImpl node0 = cluster.node(0);
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, anotherPartId);
+
+        assertRealAssignments(node0, fixingPartId, 0, 2, 4);
+        assertRealAssignments(node0, anotherPartId, 1, 2, 4);
+
+        stopNodesInParallel(2, 4);
+
+        waitForScale(node0, 3);
+
+        // Should fail because majority was lost.
+        List<Throwable> fixingPartErrorsBeforeReset = insertValues(table, fixingPartId, 0);
+        assertThat(fixingPartErrorsBeforeReset, Matchers.not(empty()));
+
+        List<Throwable> anotherPartErrorsBeforeReset = insertValues(table, anotherPartId, 0);
+        assertThat(anotherPartErrorsBeforeReset, Matchers.not(empty()));
+
+        CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                Set.of(anotherPartId)
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        awaitPrimaryReplica(node0, anotherPartId);
+
+        // Shouldn't fail because partition assignments were reset.
+        List<Throwable> fixedPartErrors = insertValues(table, anotherPartId, 0);
+        assertThat(fixedPartErrors, is(empty()));
+
+        // Was not specified in reset, shouldn't be fixed. */
+        List<Throwable> anotherPartErrors = insertValues(table, fixingPartId, 0);
+        assertThat(anotherPartErrors, Matchers.not(empty()));
+    }
+
+    /**
      * Tests a scenario where there's a single partition on a node 1, and the node that hosts it is lost. Reconfiguration of the zone should
      * create new raft group on the remaining node, without any data.
      */
@@ -225,7 +282,12 @@
 
         waitForScale(node0, 1);
 
-        CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(zoneId, tableId);
+        CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                Set.of()
+        );
+
         assertThat(updateFuture, willCompleteSuccessfully());
 
         awaitPrimaryReplica(node0, partId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 436ccbb..a569933 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -39,6 +39,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -77,6 +79,7 @@
 import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.core.State;
@@ -85,7 +88,7 @@
 /**
  * Manager, responsible for "disaster recovery" operations.
  * Internally it triggers meta-storage updates, in order to acquire unique causality token.
- * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}.
+ * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(String, String, Set)}.
  * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>.
  */
 public class DisasterRecoveryManager implements IgniteComponent {
@@ -192,12 +195,23 @@
      * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers"
      * so that a new leader could be elected.
      *
-     * @param zoneId Distribution zone ID.
-     * @param tableId Table ID.
+     * @param zoneName Name of the distribution zone.
+     * @param tableName Fully-qualified table name.
+     * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions.
      * @return Operation future.
      */
-    public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) {
-        return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId));
+    public CompletableFuture<Void> resetPartitions(String zoneName, String tableName, Set<Integer> partitionIds) {
+        Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        int tableId = Optional.ofNullable(catalog.table(tableName))
+                .orElseThrow(() -> new TableNotFoundException(tableName)).id();
+
+        CatalogZoneDescriptor zone = Optional.ofNullable(catalog.zone(zoneName))
+                .orElseThrow(() -> new DistributionZoneNotFoundException(zoneName));
+
+        checkPartitionsRange(partitionIds, Set.of(zone));
+
+        return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zone.id(), tableId, partitionIds));
     }
 
     /**
@@ -312,6 +326,26 @@
         });
     }
 
+    private static void checkPartitionsRange(Set<Integer> partitionIds, Collection<CatalogZoneDescriptor> zones) {
+        if (partitionIds.isEmpty()) {
+            return;
+        }
+
+        int minPartition = partitionIds.stream().min(Integer::compare).get();
+
+        if (minPartition < 0) {
+            throw new IllegalPartitionIdException(minPartition);
+        }
+
+        int maxPartition = partitionIds.stream().max(Integer::compare).get();
+
+        zones.forEach(zone -> {
+            if (maxPartition >= zone.partitions()) {
+                throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
+            }
+        });
+    }
+
     private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
         if (nodeNames.isEmpty()) {
             return dzManager.logicalTopology();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
index 063068e..76350e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
@@ -42,10 +42,13 @@
 
     private final int tableId;
 
-    ManualGroupUpdateRequest(UUID operationId, int zoneId, int tableId) {
+    private final Set<Integer> partitionIds;
+
+    ManualGroupUpdateRequest(UUID operationId, int zoneId, int tableId, Set<Integer> partitionIds) {
         this.operationId = operationId;
         this.zoneId = zoneId;
         this.tableId = tableId;
+        this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds);
     }
 
     @Override
@@ -62,6 +65,10 @@
         return tableId;
     }
 
+    public Set<Integer> partitionIds() {
+        return partitionIds;
+    }
+
     @Override
     public CompletableFuture<Void> handle(
             DisasterRecoveryManager disasterRecoveryManager,
@@ -87,6 +94,7 @@
             CompletableFuture<?>[] futures = RebalanceUtil.forceAssignmentsUpdate(
                     tableDescriptor,
                     zoneDescriptor,
+                    partitionIds,
                     dataNodes,
                     nodeConsistentIds,
                     msRevision,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
index fcf72bb..aadfe42 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
@@ -21,7 +21,7 @@
 
 import java.util.Set;
 
-/** Exception is thrown when appropriate node can`t be found. */
+/** Exception is thrown when appropriate zones can`t be found. */
 public class ZonesNotFoundException extends DisasterRecoveryException {
     private static final long serialVersionUID = -8475588176132321568L;