IGNITE-21257 Introduce REST API for viewing partition states (#3614)
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
new file mode 100644
index 0000000..4fa1b92
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")
+public interface DisasterRecoveryApi {
+ @Get("state/local")
+ @Operation(operationId = "getLocalPartitionStates", description = "Returns local partition states.")
+ @ApiResponse(responseCode = "200", description = "Partition states returned.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @Produces(MediaType.APPLICATION_JSON)
+ CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates();
+
+ @Get("state/local/{zoneName}")
+ @Operation(operationId = "getLocalPartitionStatesByZone", description = "Returns local partition states.")
+ @ApiResponse(responseCode = "200", description = "Partition states returned.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "404", description = "Zone is not found.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @Produces(MediaType.APPLICATION_JSON)
+ CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(@PathVariable("zoneName") String zoneName);
+
+ @Get("state/global")
+ @Operation(operationId = "getGlobalPartitionStates", description = "Returns global partition states.")
+ @ApiResponse(responseCode = "200", description = "Partition states returned.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @Produces(MediaType.APPLICATION_JSON)
+ CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates();
+
+ @Get("state/global/{zoneName}")
+ @Operation(operationId = "getGlobalPartitionStatesByZone", description = "Returns global partition states.")
+ @ApiResponse(responseCode = "200", description = "Partition states returned.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "404", description = "Zone is not found.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
+ @Produces(MediaType.APPLICATION_JSON)
+ CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(@PathVariable("zoneName") String zoneName);
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
new file mode 100644
index 0000000..20dbbe9
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+
+/**
+ * Global partition state schema class.
+ */
+@Schema(description = "Information about global partition state.")
+public class GlobalPartitionStateResponse {
+ private final int partitionId;
+ private final String tableName;
+ private final String state;
+
+ /**
+ * Constructor.
+ */
+ @JsonCreator
+ public GlobalPartitionStateResponse(
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("state") String state
+ ) {
+ this.partitionId = partitionId;
+ this.tableName = tableName;
+ this.state = state;
+ }
+
+ @JsonGetter("partitionId")
+ public int partitionId() {
+ return partitionId;
+ }
+
+ @JsonGetter("tableName")
+ public String tableName() {
+ return tableName;
+ }
+
+ @JsonGetter("state")
+ public String state() {
+ return state;
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
new file mode 100644
index 0000000..5031abe
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+
+/**
+ * Global partition states schema class.
+ */
+@Schema(description = "Information about global partition states.")
+public class GlobalPartitionStatesResponse {
+ @Schema
+ private final List<GlobalPartitionStateResponse> states;
+
+ @JsonCreator
+ public GlobalPartitionStatesResponse(@JsonProperty("states") List<GlobalPartitionStateResponse> states) {
+ this.states = List.copyOf(states);
+ }
+
+ @JsonGetter("states")
+ public List<GlobalPartitionStateResponse> states() {
+ return states;
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
new file mode 100644
index 0000000..c9d45b5
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+
+/**
+ * Local partition state schema class.
+ */
+@Schema(description = "Information about local partition state.")
+public class LocalPartitionStateResponse {
+ private final int partitionId;
+ private final String tableName;
+ private final String nodeName;
+ private final String state;
+
+ /**
+ * Constructor.
+ */
+ @JsonCreator
+ public LocalPartitionStateResponse(
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("nodeName") String nodeName,
+ @JsonProperty("state") String state
+ ) {
+ this.partitionId = partitionId;
+ this.tableName = tableName;
+ this.nodeName = nodeName;
+ this.state = state;
+ }
+
+ @JsonGetter("partitionId")
+ public int partitionId() {
+ return partitionId;
+ }
+
+ @JsonGetter("tableName")
+ public String tableName() {
+ return tableName;
+ }
+
+ @JsonGetter("nodeName")
+ public String nodeName() {
+ return nodeName;
+ }
+
+ @JsonGetter("state")
+ public String state() {
+ return state;
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
new file mode 100644
index 0000000..21c1174
--- /dev/null
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+
+/**
+ * Local partition states schema class.
+ */
+@Schema(description = "Information about local partition states.")
+public class LocalPartitionStatesResponse {
+ @Schema
+ private final List<LocalPartitionStateResponse> states;
+
+ @JsonCreator
+ public LocalPartitionStatesResponse(@JsonProperty("states") List<LocalPartitionStateResponse> states) {
+ this.states = List.copyOf(states);
+ }
+
+ @JsonGetter("states")
+ public List<LocalPartitionStateResponse> states() {
+ return states;
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 6937a34..98e0d33 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -26,6 +26,7 @@
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.HttpCode;
import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+import org.apache.ignite.lang.ErrorGroups.DistributionZones;
/**
* Handles {@link IgniteInternalException} and represents it as a rest response.
@@ -36,6 +37,15 @@
@Override
public HttpResponse<? extends Problem> handle(HttpRequest request, IgniteInternalException exception) {
+ if (exception.code() == DistributionZones.ZONE_NOT_FOUND_ERR) {
+ return HttpProblemResponse.from(
+ Problem.fromHttpCode(HttpCode.NOT_FOUND)
+ .detail(exception.getMessage())
+ .traceId(exception.traceId())
+ .code(exception.codeAsString())
+ );
+ }
+
return HttpProblemResponse.from(
Problem.fromHttpCode(HttpCode.INTERNAL_SERVER_ERROR)
.traceId(exception.traceId())
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index a7592c8..da836c6 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -36,6 +36,7 @@
implementation project(':ignite-network')
implementation project(':ignite-cluster-management')
implementation project(':ignite-metrics')
+ implementation project(':ignite-table')
implementation project(':ignite-code-deployment')
implementation project(':ignite-security-api')
implementation project(':ignite-compute')
@@ -72,6 +73,7 @@
integrationTestImplementation project(':ignite-rest-api')
integrationTestImplementation project(':ignite-network')
integrationTestImplementation project(':ignite-api')
+ integrationTestImplementation project(':ignite-catalog')
integrationTestImplementation project(':ignite-security')
integrationTestImplementation project(':ignite-code-deployment')
integrationTestImplementation project(':ignite-runner')
diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
new file mode 100644
index 0000000..2d3a92a
--- /dev/null
+++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for disaster recovery REST commands.
+ */
+@MicronautTest
+public class ItDisasterRecoveryControllerTest extends ClusterPerTestIntegrationTest {
+ private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT;
+
+ @Inject
+ @Client(NODE_URL + "/management/v1/recovery/")
+ HttpClient client;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Test
+ void testLocalPartitionStates() {
+ executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+ var response = client.toBlocking().exchange("/state/local/", LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ LocalPartitionStatesResponse body = response.body();
+ assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+ List<Integer> partitionIds = body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+ assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+ }
+
+ @Test
+ void testLocalPartitionStatesByZoneMissingZone() {
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () -> client.toBlocking().exchange("/state/local/no-such-zone/", LocalPartitionStatesResponse.class)
+ );
+
+ assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+ }
+
+ @Test
+ void testLocalPartitionStatesByZone() {
+ executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+ executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+ executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'");
+
+ var response = client.toBlocking().exchange("/state/local/Default/", LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+ assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+ response = client.toBlocking().exchange("/state/local/FOO/", LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ List<LocalPartitionStateResponse> states = response.body().states();
+ assertEquals(1, states.size());
+
+ LocalPartitionStateResponse state = states.get(0);
+ assertEquals(0, state.partitionId());
+ assertEquals("idrct_tlpsbz_0", state.nodeName());
+ assertEquals("FOO", state.tableName());
+ assertEquals("HEALTHY", state.state());
+ }
+
+ @Test
+ void testGlobalPartitionStates() {
+ executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+ var response = client.toBlocking().exchange("/state/global/", GlobalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ GlobalPartitionStatesResponse body = response.body();
+ assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+ List<Integer> partitionIds = body.states().stream().map(GlobalPartitionStateResponse::partitionId).collect(toList());
+ assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+ }
+
+ @Test
+ void testGlobalPartitionStatesByZoneMissingZone() {
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () -> client.toBlocking().exchange("/state/global/no-such-zone/", GlobalPartitionStatesResponse.class)
+ );
+
+ assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+ }
+
+ @Test
+ void testGlobalPartitionStatesByZone() {
+ executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+ executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+ executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'");
+
+ var response = client.toBlocking().exchange("/state/global/Default/", GlobalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+ assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+ response = client.toBlocking().exchange("/state/global/FOO/", GlobalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ List<GlobalPartitionStateResponse> states = response.body().states();
+ assertEquals(1, states.size());
+
+ GlobalPartitionStateResponse state = states.get(0);
+ assertEquals(0, state.partitionId());
+ assertEquals("FOO", state.tableName());
+ assertEquals("AVAILABLE", state.state());
+ }
+}
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
new file mode 100644
index 0000000..4598ab9
--- /dev/null
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.Comparator.comparing;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Controller;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery/")
+@Requires(classes = IgniteInternalExceptionHandler.class)
+public class DisasterRecoveryController implements DisasterRecoveryApi {
+ private final DisasterRecoveryManager disasterRecoveryManager;
+
+ public DisasterRecoveryController(DisasterRecoveryManager disasterRecoveryManager) {
+ this.disasterRecoveryManager = disasterRecoveryManager;
+ }
+
+ @Override
+ public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates() {
+ return disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates);
+ }
+
+ @Override
+ public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(String zoneName) {
+ return disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates);
+ }
+
+ @Override
+ public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates() {
+ return disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates);
+ }
+
+ @Override
+ public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(String zoneName) {
+ return disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates);
+ }
+
+ private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> localStates) {
+ List<LocalPartitionStateResponse> states = new ArrayList<>();
+
+ for (Map<String, LocalPartitionState> map : localStates.values()) {
+ for (Entry<String, LocalPartitionState> entry : map.entrySet()) {
+ String nodeName = entry.getKey();
+ LocalPartitionState state = entry.getValue();
+
+ states.add(new LocalPartitionStateResponse(
+ state.partitionId,
+ state.tableName,
+ nodeName,
+ state.state.name()
+ ));
+ }
+ }
+
+ // Sort the output conveniently.
+ states.sort(comparing(LocalPartitionStateResponse::tableName)
+ .thenComparingInt(LocalPartitionStateResponse::partitionId)
+ .thenComparing(LocalPartitionStateResponse::nodeName));
+
+ return new LocalPartitionStatesResponse(states);
+ }
+
+ private static GlobalPartitionStatesResponse convertGlobalStates(Map<TablePartitionId, GlobalPartitionState> globalStates) {
+ List<GlobalPartitionStateResponse> states = new ArrayList<>();
+
+ for (GlobalPartitionState state : globalStates.values()) {
+ states.add(new GlobalPartitionStateResponse(
+ state.partitionId,
+ state.tableName,
+ state.state.name()
+ ));
+ }
+
+ // Sort the output conveniently.
+ states.sort(comparing(GlobalPartitionStateResponse::tableName)
+ .thenComparingInt(GlobalPartitionStateResponse::partitionId));
+
+ return new GlobalPartitionStatesResponse(states);
+ }
+}
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java
new file mode 100644
index 0000000..a5b3814
--- /dev/null
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import io.micronaut.context.annotation.Bean;
+import io.micronaut.context.annotation.Factory;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.rest.RestFactory;
+import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+
+/**
+ * Factory that defines beans required for the rest module.
+ */
+@Factory
+public class DisasterRecoveryFactory implements RestFactory {
+ private DisasterRecoveryManager disasterRecoveryManager;
+
+ public DisasterRecoveryFactory(DisasterRecoveryManager disasterRecoveryManager) {
+ this.disasterRecoveryManager = disasterRecoveryManager;
+ }
+
+ @Bean
+ @Singleton
+ public DisasterRecoveryManager disasterRecoveryManager() {
+ return disasterRecoveryManager;
+ }
+
+ @Override
+ public void cleanResources() {
+ disasterRecoveryManager = null;
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ce4fc2e..27dba1d 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -170,6 +170,7 @@
import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory;
import org.apache.ignite.internal.rest.metrics.MetricRestFactory;
import org.apache.ignite.internal.rest.node.NodeManagementRestFactory;
+import org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
@@ -935,6 +936,7 @@
Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager);
Supplier<RestFactory> restManagerFactory = () -> new RestManagerFactory(restManager);
Supplier<RestFactory> computeRestFactory = () -> new ComputeRestFactory(compute);
+ Supplier<RestFactory> disasterRecoveryFactory = () -> new DisasterRecoveryFactory(disasterRecoveryManager);
RestConfiguration restConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY);
@@ -946,7 +948,8 @@
deploymentCodeRestFactory,
authProviderFactory,
restManagerFactory,
- computeRestFactory
+ computeRestFactory,
+ disasterRecoveryFactory
),
restManager,
restConfiguration
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index d7a980c..b0d5028 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -60,7 +60,7 @@
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
@@ -326,7 +326,7 @@
}
private List<Integer> getRealAssignments(IgniteImpl node0, int partId) {
- var partitionStatesFut = node0.disasterRecoveryManager().partitionStates(zoneName);
+ var partitionStatesFut = node0.disasterRecoveryManager().localPartitionStates(zoneName);
assertThat(partitionStatesFut, willCompleteSuccessfully());
Map<String, LocalPartitionState> partitionStates = partitionStatesFut.join().get(new TablePartitionId(tableId, partId));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 63cc832..3b1a1dc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -28,7 +28,7 @@
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
@@ -216,7 +216,7 @@
* Messages for {@link DisasterRecoveryManager}.
*/
interface DisasterRecoveryMessages {
- /** Message type for {@link LocalPartitionState}. */
+ /** Message type for {@link LocalPartitionStateMessage}. */
short LOCAL_PARTITION_STATE = 100;
/** Message type for {@link LocalPartitionStatesRequest}. */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 98bb2ca..0961c1a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -17,6 +17,17 @@
package org.apache.ignite.internal.table.distributed.disaster;
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
+import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.BROKEN;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.CATCHING_UP;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.HEALTHY;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INITIALIZING;
+import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.ArrayList;
@@ -30,8 +41,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -39,6 +50,7 @@
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -52,7 +64,7 @@
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
-import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState;
+import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
import org.apache.ignite.internal.util.ByteUtils;
@@ -76,9 +88,17 @@
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
- private static final int TIMEOUT = 30;
+ /** Disaster recovery operations timeout in seconds. */
+ private static final int TIMEOUT_SECONDS = 30;
- private static final int CATCH_UP_THRESHOLD = 10;
+ /**
+ * Maximal allowed difference between committed index on the leader and on the follower, that differentiates
+ * {@link LocalPartitionStateEnum#HEALTHY} from {@link LocalPartitionStateEnum#CATCHING_UP}.
+ */
+ private static final int CATCH_UP_THRESHOLD = 100;
+
+ /** Zone ID that corresponds to "all zones". */
+ private static final int NO_ZONE_ID = -1;
/** Thread pool executor for async parts. */
private final ExecutorService threadPool;
@@ -152,6 +172,10 @@
@Override
public void stop() throws Exception {
metaStorageManager.unregisterWatch(watchListener);
+
+ for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
+ future.completeExceptionally(new NodeStoppingException());
+ }
}
/**
@@ -172,29 +196,58 @@
* Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping
* between a node name and a partition state.
*
- * @param zoneName Zone name.
+ * @param zoneName Zone name. {@code null} means "all zones".
* @return Future with the mapping.
*/
- public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) {
- int latestCatalogVersion = catalogManager.latestCatalogVersion();
- Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream()
- .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName))
- .findAny();
+ public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) {
+ Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
- if (zoneDesciptorOptional.isEmpty()) {
- return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null));
+ return localPartitionStatesInternal(zoneName, catalog)
+ .thenApply(res -> normalizeLocal(res, catalog));
+ }
+
+ /**
+ * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the global
+ * partition state enum value.
+ *
+ * @param zoneName Zone name. {@code null} means "all zones".
+ * @return Future with the mapping.
+ */
+ public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> globalPartitionStates(@Nullable String zoneName) {
+ Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+ return localPartitionStatesInternal(zoneName, catalog)
+ .thenApply(res -> normalizeLocal(res, catalog))
+ .thenApply(res -> assembleGlobal(res, catalog));
+ }
+
+ private CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionStateMessage>>> localPartitionStatesInternal(
+ @Nullable String zoneName, Catalog catalog
+ ) {
+ int zoneId;
+ if (zoneName == null) {
+ zoneId = NO_ZONE_ID;
+ } else {
+ Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalog.zones().stream()
+ .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName))
+ .findAny();
+
+ if (zoneDesciptorOptional.isEmpty()) {
+ return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null));
+ }
+
+ CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
+ zoneId = zoneDescriptor.id();
}
- CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
-
Set<NodeWithAttributes> logicalTopology = dzManager.logicalTopology();
LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest()
- .zoneId(zoneDescriptor.id())
- .catalogVersion(latestCatalogVersion)
+ .zoneId(zoneId)
+ .catalogVersion(catalog.version())
.build();
- Map<TablePartitionId, Map<String, LocalPartitionState>> result = new ConcurrentHashMap<>();
+ Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result = new ConcurrentHashMap<>();
CompletableFuture<?>[] futures = new CompletableFuture[logicalTopology.size()];
int i = 0;
@@ -202,7 +255,7 @@
CompletableFuture<NetworkMessage> invokeFuture = messagingService.invoke(
node.nodeName(),
localPartitionStatesRequest,
- TimeUnit.SECONDS.toMillis(TIMEOUT)
+ TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
);
futures[i++] = invokeFuture.thenAccept(networkMessage -> {
@@ -210,7 +263,7 @@
var response = (LocalPartitionStatesResponse) networkMessage;
- for (LocalPartitionState state : response.states()) {
+ for (LocalPartitionStateMessage state : response.states()) {
result.compute(state.partitionId().asTablePartitionId(), (tablePartitionId, map) -> {
if (map == null) {
return Map.of(node.nodeName(), state);
@@ -224,7 +277,7 @@
});
}
- return CompletableFuture.allOf(futures).handle((unused, throwable) -> normalize(result));
+ return CompletableFuture.allOf(futures).handle((unused, throwable) -> result);
}
/**
@@ -238,7 +291,7 @@
CompletableFuture<Void> operationFuture = new CompletableFuture<Void>()
.whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId))
- .orTimeout(TIMEOUT, TimeUnit.SECONDS);
+ .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
ongoingOperationsById.put(operationId, operationFuture);
@@ -287,14 +340,15 @@
int catalogVersion = request.catalogVersion();
catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
- List<LocalPartitionState> statesList = new ArrayList<>();
+ List<LocalPartitionStateMessage> statesList = new ArrayList<>();
raftManager.forEach((raftNodeId, raftGroupService) -> {
if (raftNodeId.groupId() instanceof TablePartitionId) {
var tablePartitionId = (TablePartitionId) raftNodeId.groupId();
CatalogTableDescriptor tableDescriptor = catalogManager.table(tablePartitionId.tableId(), catalogVersion);
- if (tableDescriptor == null || tableDescriptor.zoneId() != request.zoneId()) {
+ // Only tables that belong to a specific catalog version will be returned.
+ if (tableDescriptor == null || request.zoneId() != NO_ZONE_ID && tableDescriptor.zoneId() != request.zoneId()) {
return;
}
@@ -304,18 +358,18 @@
LocalPartitionStateEnum localState = convertState(nodeState);
long lastLogIndex = raftNode.lastLogIndex();
- if (localState == LocalPartitionStateEnum.HEALTHY) {
+ if (localState == HEALTHY) {
// Node without log didn't process anything yet, it's not really "healthy" before it accepts leader's configuration.
if (lastLogIndex == 0) {
- localState = LocalPartitionStateEnum.INITIALIZING;
+ localState = INITIALIZING;
}
if (raftNode.isInstallingSnapshot()) {
- localState = LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
+ localState = INSTALLING_SNAPSHOT;
}
}
- statesList.add(MSG_FACTORY.localPartitionState()
+ statesList.add(MSG_FACTORY.localPartitionStateMessage()
.partitionId(MSG_FACTORY.tablePartitionIdMessage()
.tableId(tablePartitionId.tableId())
.partitionId(tablePartitionId.partitionId())
@@ -342,13 +396,13 @@
case STATE_TRANSFERRING:
case STATE_CANDIDATE:
case STATE_FOLLOWER:
- return LocalPartitionStateEnum.HEALTHY;
+ return HEALTHY;
case STATE_ERROR:
- return LocalPartitionStateEnum.BROKEN;
+ return BROKEN;
case STATE_UNINITIALIZED:
- return LocalPartitionStateEnum.INITIALIZING;
+ return INITIALIZING;
case STATE_SHUTTING:
case STATE_SHUTDOWN:
@@ -357,36 +411,105 @@
default:
// Unrecognized state, better safe than sorry.
- return LocalPartitionStateEnum.BROKEN;
+ return BROKEN;
}
}
/**
- * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is
+ * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP}, it can only be done once the state of all peers is
* known.
*/
- private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
- Map<TablePartitionId, Map<String, LocalPartitionState>> result
+ private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal(
+ Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result,
+ Catalog catalog
) {
- return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
- Map<String, LocalPartitionState> map = entry.getValue();
+ return result.entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> {
+ TablePartitionId tablePartitionId = entry.getKey();
+ Map<String, LocalPartitionStateMessage> map = entry.getValue();
// noinspection OptionalGetWithoutIsPresent
- long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+ long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
- return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {
- LocalPartitionState state = entry2.getValue();
+ return map.entrySet().stream().collect(toMap(Map.Entry::getKey, entry2 -> {
+ LocalPartitionStateMessage stateMsg = entry2.getValue();
- if (state.state() != LocalPartitionStateEnum.HEALTHY || maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) {
- return state;
+ LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+ if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
+ stateEnum = CATCHING_UP;
}
- return MSG_FACTORY.localPartitionState()
- .state(LocalPartitionStateEnum.CATCHING_UP)
- .partitionId(state.partitionId())
- .logIndex(state.logIndex())
- .build();
+ // Tables, returned from local states request, are always present in the required version of the catalog.
+ CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
+ return new LocalPartitionState(tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum);
}));
}));
}
+
+ private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal(
+ Map<TablePartitionId, Map<String, LocalPartitionState>> localResult,
+ Catalog catalog
+ ) {
+ Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream()
+ .collect(toMap(Map.Entry::getKey, entry -> {
+ TablePartitionId tablePartitionId = entry.getKey();
+ Map<String, LocalPartitionState> map = entry.getValue();
+
+ return assembleGlobalStateFromLocal(catalog, tablePartitionId, map);
+ }));
+
+ localResult.keySet().stream()
+ .map(TablePartitionId::tableId)
+ .distinct()
+ .forEach(tableId -> {
+ int zoneId = catalog.table(tableId).zoneId();
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+ int partitions = zoneDescriptor.partitions();
+
+ // Make missing partitions explicitly unavailable.
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
+
+ result.computeIfAbsent(tablePartitionId, key ->
+ new GlobalPartitionState(catalog.table(key.tableId()).name(), key.partitionId(),
+ GlobalPartitionStateEnum.UNAVAILABLE)
+ );
+ }
+ });
+
+ return result;
+ }
+
+ private static GlobalPartitionState assembleGlobalStateFromLocal(
+ Catalog catalog,
+ TablePartitionId tablePartitionId,
+ Map<String, LocalPartitionState> map
+ ) {
+ // Tables, returned from local states request, are always present in the required version of the catalog.
+ int zoneId = catalog.table(tablePartitionId.tableId()).zoneId();
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+ int replicas = zoneDescriptor.replicas();
+ int quorum = replicas / 2 + 1;
+
+ Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream()
+ .collect(groupingBy(localPartitionState -> localPartitionState.state));
+
+ GlobalPartitionStateEnum globalStateEnum;
+
+ int healthyReplicas = groupedStates.getOrDefault(HEALTHY, emptyList()).size();
+
+ if (healthyReplicas == replicas) {
+ globalStateEnum = AVAILABLE;
+ } else if (healthyReplicas >= quorum) {
+ globalStateEnum = DEGRADED;
+ } else if (healthyReplicas > 0) {
+ globalStateEnum = READ_ONLY;
+ } else {
+ globalStateEnum = GlobalPartitionStateEnum.UNAVAILABLE;
+ }
+
+ LocalPartitionState anyLocalState = map.values().iterator().next();
+ return new GlobalPartitionState(anyLocalState.tableName, tablePartitionId.partitionId(), globalStateEnum);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
new file mode 100644
index 0000000..816bfc6
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Global partition state.
+ */
+public class GlobalPartitionState {
+ public final String tableName;
+
+ public final int partitionId;
+
+ public final GlobalPartitionStateEnum state;
+
+ GlobalPartitionState(String tableName, int partitionId, GlobalPartitionStateEnum state) {
+ this.tableName = tableName;
+ this.partitionId = partitionId;
+ this.state = state;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java
new file mode 100644
index 0000000..f2b6be6
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Enum for states of partitions.
+ */
+public enum GlobalPartitionStateEnum {
+ /** All replicas are healthy. */
+ AVAILABLE,
+
+ /** There are healthy replicas, and they form a majority. */
+ DEGRADED,
+
+ /** There are healthy replicas, but they don't form a majority. */
+ READ_ONLY,
+
+ /** There are no healthy replicas. */
+ UNAVAILABLE
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
new file mode 100644
index 0000000..0db3f39
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Local partition state.
+ */
+public class LocalPartitionState {
+ public final String tableName;
+
+ public final int partitionId;
+
+ public final LocalPartitionStateEnum state;
+
+ LocalPartitionState(String tableName, int partitionId, LocalPartitionStateEnum state) {
+ this.tableName = tableName;
+ this.partitionId = partitionId;
+ this.state = state;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
similarity index 95%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
index 8f9971d..49e9e7c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStateMessage.java
@@ -28,7 +28,7 @@
* Local partition state message, has partition ID, state and last committed log index.
*/
@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE)
-public interface LocalPartitionState extends NetworkMessage {
+public interface LocalPartitionStateMessage extends NetworkMessage {
/** Partition ID. */
TablePartitionIdMessage partitionId();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
index b1ba925..e55f978 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesResponse.java
@@ -27,5 +27,5 @@
*/
@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
public interface LocalPartitionStatesResponse extends NetworkMessage {
- List<LocalPartitionState> states();
+ List<LocalPartitionStateMessage> states();
}