[IGNITE-21295] Implement REST API for manual raft group configuration update (#3701)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
index 24763d0..2559ae1 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
@@ -38,6 +38,15 @@
}
/**
+ * Creates an exception with the given fully-qualified table name.
+ *
+ * @param tableName Fully-qualified table name.
+ */
+ public TableNotFoundException(String tableName) {
+ super(TABLE_NOT_FOUND_ERR, "The table does not exist [name=" + tableName + ']');
+ }
+
+ /**
* Creates an exception with the given trace ID, error code, detailed message, and cause.
*
* @param traceId Unique identifier of the exception.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index 301a451..f35261e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -20,6 +20,9 @@
import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.unmodifiable;
import static java.util.Collections.unmodifiableList;
import static java.util.Comparator.comparingInt;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toUnmodifiableMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
@@ -28,12 +31,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.function.Function;
import java.util.stream.Collector;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -49,12 +51,12 @@
*/
public class Catalog {
private static <T extends CatalogObjectDescriptor> Collector<T, ?, Map<String, T>> toMapByName() {
- return Collectors.toUnmodifiableMap(CatalogObjectDescriptor::name, Function.identity());
+ return toUnmodifiableMap(CatalogObjectDescriptor::name, identity());
}
private static <T extends CatalogObjectDescriptor> Collector<T, ?, Int2ObjectMap<T>> toMapById() {
- return Collectors.collectingAndThen(
- CollectionUtils.toIntMapCollector(CatalogObjectDescriptor::id, Function.identity()),
+ return collectingAndThen(
+ CollectionUtils.toIntMapCollector(CatalogObjectDescriptor::id, identity()),
Int2ObjectMaps::unmodifiable
);
}
@@ -64,6 +66,7 @@
private final long activationTimestamp;
private final Map<String, CatalogSchemaDescriptor> schemasByName;
private final Map<String, CatalogZoneDescriptor> zonesByName;
+ private final Map<String, CatalogTableDescriptor> tablesByName;
private final @Nullable CatalogZoneDescriptor defaultZone;
@IgniteToStringExclude
@@ -110,6 +113,13 @@
schemasByName = schemas.stream().collect(toMapByName());
zonesByName = zones.stream().collect(toMapByName());
+ tablesByName = new HashMap<>();
+ for (CatalogSchemaDescriptor schema : schemas) {
+ for (CatalogTableDescriptor table : schema.tables()) {
+ tablesByName.put(schema.name() + "." + table.name(), table);
+ }
+ }
+
schemasById = schemas.stream().collect(toMapById());
tablesById = schemas.stream().flatMap(s -> Arrays.stream(s.tables())).collect(toMapById());
indexesById = schemas.stream().flatMap(s -> Arrays.stream(s.indexes())).collect(toMapById());
@@ -155,6 +165,15 @@
return tablesById.get(tableId);
}
+ /**
+ * Returns table descriptor by fully-qualified table name.
+ *
+ * @param tableName Fully-qualified table name. Case-sensitive, without quotes.
+ * */
+ public @Nullable CatalogTableDescriptor table(String tableName) {
+ return tablesByName.get(tableName);
+ }
+
public Collection<CatalogTableDescriptor> tables() {
return tablesById.values();
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
index 59750bd..1137538 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
@@ -42,6 +42,15 @@
* The constructor.
*
* @param zoneName Zone name.
+ */
+ public DistributionZoneNotFoundException(String zoneName) {
+ super(ZONE_NOT_FOUND_ERR, "Distribution zone is not found [zoneName=" + zoneName + ']');
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param zoneName Zone name.
* @param cause Optional nested exception (can be {@code null}).
*/
public DistributionZoneNotFoundException(String zoneName, @Nullable Throwable cause) {
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index bf97314..6fdec03 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -17,7 +17,8 @@
package org.apache.ignite.internal.distributionzones.rebalance;
-import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -36,7 +37,6 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,7 +44,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
@@ -282,9 +281,10 @@
long storageRevision,
MetaStorageManager metaStorageManager
) {
- CompletableFuture<List<Assignments>> tableAssignmentsFut = tableAssignments(
+ CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
metaStorageManager,
tableDescriptor.id(),
+ Set.of(),
zoneDescriptor.partitions()
);
@@ -321,6 +321,7 @@
*
* @param tableDescriptor Table descriptor.
* @param zoneDescriptor Zone descriptor.
+ * @param partitionIds Partitions IDs to force assignments for. If empty, reassigns all zone's partitions.
* @param dataNodes Current DZ data nodes.
* @param aliveNodesConsistentIds Set of alive nodes according to logical topology.
* @param revision Meta-storage revision to be associated with reassignment.
@@ -330,25 +331,31 @@
public static CompletableFuture<?>[] forceAssignmentsUpdate(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
+ Set<Integer> partitionIds,
Set<String> dataNodes,
Set<String> aliveNodesConsistentIds,
long revision,
MetaStorageManager metaStorageManager
) {
- CompletableFuture<List<Assignments>> tableAssignmentsFut = tableAssignments(
+ CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
metaStorageManager,
tableDescriptor.id(),
+ partitionIds,
zoneDescriptor.partitions()
);
- CompletableFuture<?>[] futures = new CompletableFuture[zoneDescriptor.partitions()];
-
Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, aliveNodesConsistentIds);
- for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
- TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), partId);
+ int[] ids = partitionIds.isEmpty()
+ ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
+ : partitionIds.stream().mapToInt(Integer::intValue).toArray();
- futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments ->
+ CompletableFuture<?>[] futures = new CompletableFuture[ids.length];
+
+ for (int i = 0; i < ids.length; i++) {
+ TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), ids[i]);
+
+ futures[i] = tableAssignmentsFut.thenCompose(tableAssignments ->
tableAssignments.isEmpty() ? nullCompletedFuture() : manualPartitionUpdate(
replicaGrpId,
aliveDataNodes,
@@ -620,7 +627,7 @@
* @return Result of the subtraction.
*/
public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
- return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(Collectors.toSet());
+ return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(toSet());
}
/**
@@ -646,7 +653,7 @@
* @return Result of the intersection.
*/
public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
- return op1.stream().filter(op2::contains).collect(Collectors.toSet());
+ return op1.stream().filter(op2::contains).collect(toSet());
}
/**
@@ -689,47 +696,53 @@
}
/**
- * Returns table assignments for all table partitions from meta storage.
+ * Returns table assignments for table partitions from meta storage.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
- * @param numberOfPartitions Number of partitions.
+ * @param partitionIds IDs of partitions to get assignments for. If empty, get all partition assignments.
+ * @param numberOfPartitions Number of partitions. Ignored if partition IDs are specified.
* @return Future with table assignments as a value.
*/
- static CompletableFuture<List<Assignments>> tableAssignments(
+ static CompletableFuture<Map<Integer, Assignments>> tableAssignments(
MetaStorageManager metaStorageManager,
int tableId,
+ Set<Integer> partitionIds,
int numberOfPartitions
) {
Map<ByteArray, Integer> partitionKeysToPartitionNumber = new HashMap<>();
- for (int i = 0; i < numberOfPartitions; i++) {
- partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new TablePartitionId(tableId, i)), i);
+ Collection<Integer> ids = partitionIds.isEmpty()
+ ? IntStream.range(0, numberOfPartitions).boxed().collect(toList())
+ : partitionIds;
+
+ for (Integer partId : ids) {
+ partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new TablePartitionId(tableId, partId)), partId);
}
return metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
.thenApply(entries -> {
if (entries.isEmpty()) {
- return emptyList();
+ return Map.of();
}
- Assignments[] result = new Assignments[numberOfPartitions];
+ Map<Integer, Assignments> result = new HashMap<>();
int numberOfMsPartitions = 0;
for (var mapEntry : entries.entrySet()) {
Entry entry = mapEntry.getValue();
if (!entry.empty() && !entry.tombstone()) {
- result[partitionKeysToPartitionNumber.get(mapEntry.getKey())] = Assignments.fromBytes(entry.value());
+ result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), Assignments.fromBytes(entry.value()));
numberOfMsPartitions++;
}
}
- assert numberOfMsPartitions == 0 || numberOfMsPartitions == numberOfPartitions
+ assert numberOfMsPartitions == 0 || numberOfMsPartitions == entries.size()
: "Invalid number of stable partition entries received from meta storage [received="
- + numberOfMsPartitions + ", numberOfPartitions=" + numberOfPartitions + ", tableId=" + tableId + "].";
+ + numberOfMsPartitions + ", numberOfPartitions=" + entries.size() + ", tableId=" + tableId + "].";
- return numberOfMsPartitions == 0 ? emptyList() : Arrays.asList(result);
+ return numberOfMsPartitions == 0 ? Map.of() : result;
});
}
@@ -756,6 +769,6 @@
return Assignments.fromBytes(e.value());
})
- .collect(Collectors.toList());
+ .collect(toList());
}
}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
index 78be16d..4cc26d2 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.rest.api.recovery;
+import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.annotation.QueryValue;
import io.swagger.v3.oas.annotations.Operation;
@@ -73,4 +75,18 @@
@Schema(description = "IDs of partitions to get states of. All partitions if empty.")
Optional<Set<Integer>> partitionIds
);
+
+ @Post("reset-partitions")
+ @Operation(
+ operationId = "reset-partitions",
+ description = "Updates assignments of partitions in a forced manner, allowing for the recovery of raft groups with "
+ + "lost majorities."
+ )
+ @ApiResponse(responseCode = "200", description = "Partition states reset.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @Produces(MediaType.APPLICATION_JSON)
+ CompletableFuture<Void> resetPartitions(@Body ResetPartitionsRequest command);
}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java
new file mode 100644
index 0000000..d8a45ac
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/** Disaster recovery request to reset partitions. */
+@Schema(description = "Reset partitions configuration.")
+public class ResetPartitionsRequest {
+ @Schema(description = "Name of the zone to reset partitions of. Without quotes, case-sensitive.")
+ private final String zoneName;
+
+ @Schema(description = "IDs of partitions to reset. All if empty.")
+ private final Set<Integer> partitionIds;
+
+ @Schema(description = "Fully-qualified name of the table to reset partitions of. Without quotes, case-sensitive.")
+ private final String tableName;
+
+ /** Constructor. */
+ @JsonCreator
+ public ResetPartitionsRequest(
+ @JsonProperty("zoneName") String zoneName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("partitionIds") @Nullable Collection<Integer> partitionIds
+ ) {
+ Objects.requireNonNull(zoneName);
+ Objects.requireNonNull(tableName);
+
+ this.zoneName = zoneName;
+ this.tableName = tableName;
+ this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds);
+ }
+
+ /** Returns ids of partitions to reset. */
+ @JsonGetter("partitionIds")
+ public Set<Integer> partitionIds() {
+ return partitionIds;
+ }
+
+ /** Returns name of the zone to reset partitions of. */
+ @JsonGetter("zoneName")
+ public String zoneName() {
+ return zoneName;
+ }
+
+ /** Returns name of the table to reset partitions of. */
+ @JsonGetter("tableName")
+ public String tableName() {
+ return tableName;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
index a9e6b47..0488d32 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteExceptionHandler.java
@@ -23,6 +23,7 @@
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.internal.rest.api.InvalidParam;
@@ -30,6 +31,7 @@
import org.apache.ignite.internal.rest.constants.HttpCode;
import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
import org.apache.ignite.lang.ErrorGroup;
+import org.apache.ignite.lang.ErrorGroups.Table;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -39,19 +41,12 @@
@Singleton
@Requires(classes = {IgniteException.class, ExceptionHandler.class})
public class IgniteExceptionHandler implements ExceptionHandler<IgniteException, HttpResponse<? extends Problem>> {
+ private static final Set<Integer> BAD_REQUEST_CODES = Set.of(Table.TABLE_NOT_FOUND_ERR);
+
@Override
public HttpResponse<? extends Problem> handle(HttpRequest request, IgniteException exception) {
String detail = extractDetailMessageOrNull(exception);
- if (exception.getCause() instanceof IllegalArgumentException) {
- return HttpProblemResponse.from(
- Problem.fromHttpCode(HttpCode.BAD_REQUEST)
- .detail(detail)
- .traceId(exception.traceId())
- .code(exception.codeAsString())
- );
- }
-
if (exception.getCause() instanceof ConfigurationValidationException) {
return HttpProblemResponse.from(
Problem.fromHttpCode(HttpCode.BAD_REQUEST)
@@ -62,6 +57,15 @@
);
}
+ if (exception.getCause() instanceof IllegalArgumentException || BAD_REQUEST_CODES.contains(exception.code())) {
+ return HttpProblemResponse.from(
+ Problem.fromHttpCode(HttpCode.BAD_REQUEST)
+ .detail(detail)
+ .traceId(exception.traceId())
+ .code(exception.codeAsString())
+ );
+ }
+
return HttpProblemResponse.from(
Problem.fromHttpCode(HttpCode.INTERNAL_SERVER_ERROR)
.detail(detail)
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java
new file mode 100644
index 0000000..681af5d
--- /dev/null
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.MutableHttpRequest;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.Set;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Test for disaster recovery reset partitions command, positive cases. */
+@MicronautTest
+public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerTestIntegrationTest {
+ private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
+
+ private static final String FIRST_ZONE = "first_ZONE";
+
+ private static final String TABLE_NAME = "first_ZONE_table";
+
+ private static final String QUALIFIED_TABLE_NAME = "PUBLIC." + TABLE_NAME;
+
+ @Inject
+ @Client(NODE_URL + "/management/v1/recovery/")
+ HttpClient client;
+
+ @BeforeEach
+ public void setUp() {
+ executeSql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+ executeSql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%s'", TABLE_NAME,
+ FIRST_ZONE));
+ }
+
+ @Test
+ public void testResetAllPartitions() {
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of()));
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+ }
+
+ @Test
+ public void testResetSpecifiedPartitions() {
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0)));
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+ }
+}
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index 1798a78..fe6cbe6 100644
--- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -22,16 +22,20 @@
import static java.util.stream.IntStream.range;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
+import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
@@ -47,6 +51,7 @@
import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
import org.apache.ignite.internal.util.CollectionUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -58,7 +63,11 @@
public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegrationTest {
private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
- private static final Set<String> ZONES = Set.of("first_ZONE", "second_ZONE", "third_ZONE");
+ private static final String FIRST_ZONE = "first_ZONE";
+ private static final String QUALIFIED_TABLE_NAME = "PUBLIC.first_ZONE_table";
+
+ private static final Set<String> ZONES = Set.of(FIRST_ZONE, "second_ZONE", "third_ZONE");
+
private static final Set<String> MIXED_CASE_ZONES = Set.of("mixed_first_zone", "MIXED_FIRST_ZONE", "mixed_second_zone",
"MIXED_SECOND_ZONE");
@@ -81,7 +90,7 @@
public static void setUp() {
ZONES_CONTAINING_TABLES.forEach(name -> {
sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", name, DEFAULT_AIPERSIST_PROFILE_NAME));
- sql(String.format("CREATE TABLE \"%s_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%1$s'", name));
+ sql(String.format("CREATE TABLE PUBLIC.\"%s_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '%1$s'", name));
});
sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", EMPTY_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
@@ -353,6 +362,68 @@
checkGlobalStates(states, ZONES_CONTAINING_TABLES);
}
+ @Test
+ public void testResetPartitionZoneNotFound() {
+ String unknownZone = "unknown_zone";
+
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(unknownZone, QUALIFIED_TABLE_NAME, Set.of()));
+
+ HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+ assertThat(e.getMessage(), containsString("Distribution zone is not found [zoneName=" + unknownZone + "]"));
+ }
+
+ @Test
+ public void testResetPartitionTableNotFound() {
+ String tableName = "unknown_table";
+
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(FIRST_ZONE, tableName, Set.of()));
+
+ HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+ assertThat(e.getMessage(), containsString("The table does not exist [name=" + tableName + "]"));
+ }
+
+ @Test
+ void testResetPartitionsIllegalPartitionNegative() {
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10)));
+
+ HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+
+ assertThat(e.getMessage(), containsString("Partition ID can't be negative, found: -10"));
+ }
+
+ @Test
+ void testResetPartitionsPartitionsOutOfRange() {
+ MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST("/reset-partitions",
+ new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT)));
+
+ HttpClientResponseException e = assertThrows(HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+ assertThat(e.getMessage(), containsString(
+ String.format(
+ "Partition IDs should be in range [0, %d] for zone %s, found: %d",
+ DEFAULT_PARTITION_COUNT - 1,
+ FIRST_ZONE,
+ DEFAULT_PARTITION_COUNT
+ )
+ ));
+ }
+
private static void checkLocalStates(List<LocalPartitionStateResponse> states, Set<String> zoneNames, Set<String> nodes) {
assertFalse(states.isEmpty());
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index 182a0a5..65b1f50 100644
--- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -20,6 +20,7 @@
import static java.util.Comparator.comparing;
import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import java.util.ArrayList;
import java.util.List;
@@ -34,6 +35,7 @@
import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest;
import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
@@ -78,6 +80,15 @@
.thenApply(DisasterRecoveryController::convertGlobalStates);
}
+ @Override
+ public CompletableFuture<Void> resetPartitions(@Body ResetPartitionsRequest command) {
+ return disasterRecoveryManager.resetPartitions(
+ command.zoneName(),
+ command.tableName(),
+ command.partitionIds()
+ );
+ }
+
private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, LocalPartitionStateByNode> localStates) {
List<LocalPartitionStateResponse> states = new ArrayList<>();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f128f99..341658c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -67,6 +67,7 @@
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -79,7 +80,9 @@
private static final int SCALE_DOWN_TIMEOUT_SECONDS = 2;
/** Test table name. */
- private static final String TABLE_NAME = "test";
+ private static final String TABLE_NAME = "TEST";
+
+ private static final String QUALIFIED_TABLE_NAME = "PUBLIC.TEST";
private static final int ENTRIES = 2;
@@ -175,8 +178,8 @@
}
/**
- * Tests that a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a disaster
- * recovery API. In this test, assignments will be (0, 3, 4)
+ * Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a
+ * disaster recovery API. In this test, assignments will be (0, 3, 4), according to {@link RendezvousAffinityFunction}.
*/
@Test
@ZoneParams(nodes = 5, replicas = 3, partitions = 1)
@@ -194,7 +197,12 @@
waitForScale(node0, 3);
- CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(zoneId, tableId);
+ CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ Set.of()
+ );
+
assertThat(updateFuture, willCompleteSuccessfully());
awaitPrimaryReplica(node0, partId);
@@ -206,6 +214,55 @@
}
/**
+ * Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover specified partition
+ * using a disaster recovery API. In this test, assignments will be (0, 2, 4) and (1, 2, 4), according to
+ * {@link RendezvousAffinityFunction}.
+ */
+ @Test
+ @ZoneParams(nodes = 5, replicas = 3, partitions = 2)
+ void testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
+ int fixingPartId = 1;
+ int anotherPartId = 0;
+
+ IgniteImpl node0 = cluster.node(0);
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, anotherPartId);
+
+ assertRealAssignments(node0, fixingPartId, 0, 2, 4);
+ assertRealAssignments(node0, anotherPartId, 1, 2, 4);
+
+ stopNodesInParallel(2, 4);
+
+ waitForScale(node0, 3);
+
+ // Should fail because majority was lost.
+ List<Throwable> fixingPartErrorsBeforeReset = insertValues(table, fixingPartId, 0);
+ assertThat(fixingPartErrorsBeforeReset, Matchers.not(empty()));
+
+ List<Throwable> anotherPartErrorsBeforeReset = insertValues(table, anotherPartId, 0);
+ assertThat(anotherPartErrorsBeforeReset, Matchers.not(empty()));
+
+ CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ Set.of(anotherPartId)
+ );
+
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ awaitPrimaryReplica(node0, anotherPartId);
+
+ // Shouldn't fail because partition assignments were reset.
+ List<Throwable> fixedPartErrors = insertValues(table, anotherPartId, 0);
+ assertThat(fixedPartErrors, is(empty()));
+
+ // Was not specified in reset, shouldn't be fixed. */
+ List<Throwable> anotherPartErrors = insertValues(table, fixingPartId, 0);
+ assertThat(anotherPartErrors, Matchers.not(empty()));
+ }
+
+ /**
* Tests a scenario where there's a single partition on a node 1, and the node that hosts it is lost. Reconfiguration of the zone should
* create new raft group on the remaining node, without any data.
*/
@@ -225,7 +282,12 @@
waitForScale(node0, 1);
- CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(zoneId, tableId);
+ CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ Set.of()
+ );
+
assertThat(updateFuture, willCompleteSuccessfully());
awaitPrimaryReplica(node0, partId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 436ccbb..a569933 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -77,6 +79,7 @@
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.core.State;
@@ -85,7 +88,7 @@
/**
* Manager, responsible for "disaster recovery" operations.
* Internally it triggers meta-storage updates, in order to acquire unique causality token.
- * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}.
+ * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(String, String, Set)}.
* More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>.
*/
public class DisasterRecoveryManager implements IgniteComponent {
@@ -192,12 +195,23 @@
* assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers"
* so that a new leader could be elected.
*
- * @param zoneId Distribution zone ID.
- * @param tableId Table ID.
+ * @param zoneName Name of the distribution zone.
+ * @param tableName Fully-qualified table name.
+ * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions.
* @return Operation future.
*/
- public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) {
- return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId));
+ public CompletableFuture<Void> resetPartitions(String zoneName, String tableName, Set<Integer> partitionIds) {
+ Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+ int tableId = Optional.ofNullable(catalog.table(tableName))
+ .orElseThrow(() -> new TableNotFoundException(tableName)).id();
+
+ CatalogZoneDescriptor zone = Optional.ofNullable(catalog.zone(zoneName))
+ .orElseThrow(() -> new DistributionZoneNotFoundException(zoneName));
+
+ checkPartitionsRange(partitionIds, Set.of(zone));
+
+ return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zone.id(), tableId, partitionIds));
}
/**
@@ -312,6 +326,26 @@
});
}
+ private static void checkPartitionsRange(Set<Integer> partitionIds, Collection<CatalogZoneDescriptor> zones) {
+ if (partitionIds.isEmpty()) {
+ return;
+ }
+
+ int minPartition = partitionIds.stream().min(Integer::compare).get();
+
+ if (minPartition < 0) {
+ throw new IllegalPartitionIdException(minPartition);
+ }
+
+ int maxPartition = partitionIds.stream().max(Integer::compare).get();
+
+ zones.forEach(zone -> {
+ if (maxPartition >= zone.partitions()) {
+ throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
+ }
+ });
+ }
+
private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
if (nodeNames.isEmpty()) {
return dzManager.logicalTopology();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
index 063068e..76350e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
@@ -42,10 +42,13 @@
private final int tableId;
- ManualGroupUpdateRequest(UUID operationId, int zoneId, int tableId) {
+ private final Set<Integer> partitionIds;
+
+ ManualGroupUpdateRequest(UUID operationId, int zoneId, int tableId, Set<Integer> partitionIds) {
this.operationId = operationId;
this.zoneId = zoneId;
this.tableId = tableId;
+ this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds);
}
@Override
@@ -62,6 +65,10 @@
return tableId;
}
+ public Set<Integer> partitionIds() {
+ return partitionIds;
+ }
+
@Override
public CompletableFuture<Void> handle(
DisasterRecoveryManager disasterRecoveryManager,
@@ -87,6 +94,7 @@
CompletableFuture<?>[] futures = RebalanceUtil.forceAssignmentsUpdate(
tableDescriptor,
zoneDescriptor,
+ partitionIds,
dataNodes,
nodeConsistentIds,
msRevision,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
index fcf72bb..aadfe42 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
@@ -21,7 +21,7 @@
import java.util.Set;
-/** Exception is thrown when appropriate node can`t be found. */
+/** Exception is thrown when appropriate zones can`t be found. */
public class ZonesNotFoundException extends DisasterRecoveryException {
private static final long serialVersionUID = -8475588176132321568L;