[FLINK-32111] Add Null check for checkpoints history obj
diff --git a/docs/content/docs/operations/plugins.md b/docs/content/docs/operations/plugins.md
index efbf34e..b25f055 100644
--- a/docs/content/docs/operations/plugins.md
+++ b/docs/content/docs/operations/plugins.md
@@ -107,7 +107,7 @@
In order to enable your custom `FlinkResourceListener` you need to:
1. Implement the interface
- 2. Add your listener class to `org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in `META-INF/services`
+ 2. Add your listener class to `org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener` in `META-INF/services`
3. Package your JAR and add it to the plugins directory of your operator image (`/opt/flink/plugins`)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index f53d074..99a9abc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -57,7 +57,7 @@
private ArrayNode history;
public Optional<PendingCheckpointInfo> getInProgressCheckpoint() {
- if (history.isEmpty()) {
+ if (history == null || history.isEmpty()) {
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index d723901..805cf58 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -66,11 +66,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -308,6 +310,27 @@
}
@Test
+ public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
+ throws JsonProcessingException {
+ ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+ String response =
+ "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
+ var checkpointHistoryWrapper =
+ objectMapper.readValue(response, CheckpointHistoryWrapper.class);
+ Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo =
+ assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+ assertTrue(optionalPendingCheckpointInfo.isEmpty());
+ }
+
+ @Test
+ public void testGetInProgressCheckpointsWithoutHistory() {
+ CheckpointHistoryWrapper checkpointHistoryWrapper = new CheckpointHistoryWrapper();
+ Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo =
+ assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+ assertTrue(optionalPendingCheckpointInfo.isEmpty());
+ }
+
+ @Test
public void testClusterInfoRestCompatibility() throws JsonProcessingException {
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();