blob: 03062a8ceddf11a4e69eba1f9d5e2ec695656f0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.Getter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** {@link ApplicationObserver} unit tests. */
@EnableKubernetesMockClient(crud = true)
public class ApplicationObserverTest extends OperatorTestBase {
@Getter private KubernetesClient kubernetesClient;
private Context<FlinkDeployment> readyContext;
private TestObserverAdapter<FlinkDeployment> observer;
@Override
public void setup() {
observer = new TestObserverAdapter<>(this, new ApplicationObserver(eventRecorder));
readyContext = TestUtils.createContextWithReadyJobManagerDeployment(kubernetesClient);
}
@Test
public void observeApplicationCluster() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
observer.observe(deployment, TestUtils.createEmptyContext());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
// Validate port check logic
flinkService.setPortReady(false);
// Port not ready
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
flinkService.setPortReady(true);
// Port ready but we have to recheck once again
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
assertTrue(deployment.getStatus().getClusterInfo().isEmpty());
// Stable ready
observer.observe(deployment, readyContext);
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(JobState.RUNNING.name(), deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec(),
deployment.getStatus().getReconciliationStatus().getLastStableSpec());
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(JobState.RUNNING.name(), deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getMetadata().getName(),
deployment.getStatus().getJobStatus().getJobName());
assertTrue(
Long.valueOf(deployment.getStatus().getJobStatus().getUpdateTime())
.compareTo(
Long.valueOf(
deployment
.getStatus()
.getJobStatus()
.getStartTime()))
>= 0);
// Test job manager is unavailable suddenly
flinkService.setPortReady(false);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
// Job manager recovers
flinkService.setPortReady(true);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Test listing failure
deployment.getStatus().getReconciliationStatus().setLastStableSpec(null);
flinkService.clear();
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
deployment.getStatus().getJobStatus().getState());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
}
@Test
public void testEventGeneratedWhenStatusChanged() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
deployment.setStatus(deployment.initStatus());
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observer.observe(deployment, readyContext);
var eventList =
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems();
Assertions.assertEquals(1, eventList.size());
Assertions.assertEquals("Job status changed to RUNNING", eventList.get(0).getMessage());
observer.observe(deployment, readyContext);
Assertions.assertEquals(
1,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.size());
}
@Test
public void testErrorForwardToStatusWhenJobFailed() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
deployment.setStatus(deployment.initStatus());
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observer.observe(deployment, readyContext);
Assertions.assertEquals(1, flinkService.getRunningCount());
flinkService.markApplicationJobFailedWithError(
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()),
"Job failed");
observer.observe(deployment, readyContext);
Assertions.assertEquals(0, flinkService.getRunningCount());
Assertions.assertTrue(deployment.getStatus().getError().contains("Job failed"));
}
@Test
public void observeSavepoint() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Long timedOutNonce = 1L;
deployment.getSpec().getJob().setSavepointTriggerNonce(timedOutNonce);
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
bringToReadyStatus(deployment);
assertTrue(ReconciliationUtils.isJobRunning(deployment.getStatus()));
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
// pending savepoint
assertEquals(
"savepoint_trigger_0",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
observer.observe(deployment, readyContext);
assertTrue(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
// savepoint error within grace period
assertEquals(
0,
(int)
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.count());
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
1,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.count());
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.setTriggerType(SnapshotTriggerType.MANUAL);
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.setTriggerTimestamp(Instant.now().minus(Duration.ofHours(1)).toEpochMilli());
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
1,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.filter(
e ->
e.getMessage()
.equals(
"Savepoint failed for savepointTriggerNonce: "
+ timedOutNonce))
.count());
assertEquals(
2,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.filter(
e ->
e.getMessage()
.equals(
"Savepoint failed for savepointTriggerNonce: "
+ timedOutNonce))
.collect(Collectors.toList())
.get(0)
.getCount());
// savepoint success
Long firstNonce = 123L;
deployment.getSpec().getJob().setSavepointTriggerNonce(firstNonce);
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
assertEquals(
"savepoint_trigger_1",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertEquals(
"savepoint_0",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
assertEquals(
firstNonce,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getTriggerNonce());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
// second attempt success
Long secondNonce = 456L;
deployment.getSpec().getJob().setSavepointTriggerNonce(secondNonce);
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
assertEquals(
"savepoint_trigger_2",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
assertTrue(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertEquals(
"savepoint_1",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
assertEquals(
secondNonce,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getTriggerNonce());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
// application failure after checkpoint trigger
Long thirdNonce = 789L;
deployment.getSpec().getJob().setSavepointTriggerNonce(thirdNonce);
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
assertEquals(
"savepoint_trigger_3",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
assertTrue(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
flinkService.setPortReady(false);
observer.observe(deployment, readyContext);
assertEquals(
"savepoint_1",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
assertEquals(
secondNonce,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getTriggerNonce());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
1,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.filter(
e ->
e.getMessage()
.equals(
"Savepoint failed for savepointTriggerNonce: "
+ thirdNonce))
.count());
assertEquals(
1,
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems()
.stream()
.filter(e -> e.getReason().contains("SavepointError"))
.filter(
e ->
e.getMessage()
.equals(
"Savepoint failed for savepointTriggerNonce: "
+ thirdNonce))
.collect(Collectors.toList())
.get(0)
.getCount());
flinkService.setPortReady(true);
observer.observe(deployment, readyContext);
// Simulate Failed job
var jobTuple = flinkService.listJobs().get(0);
jobTuple.f0 = "last-SP";
jobTuple.f1 =
new JobStatusMessage(
jobTuple.f1.getJobId(),
jobTuple.f1.getJobName(),
org.apache.flink.api.common.JobStatus.FAILED,
jobTuple.f1.getStartTime());
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("test");
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerTimestamp(123L);
observer.observe(deployment, readyContext);
assertEquals(
org.apache.flink.api.common.JobStatus.FAILED.name(),
deployment.getStatus().getJobStatus().getState());
assertEquals(
"last-SP",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
observer.observe(deployment, readyContext);
assertEquals(
3,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getSavepointHistory()
.size());
deployment
.getSpec()
.getFlinkConfiguration()
.put(
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(),
"1");
observer.observe(deployment, readyContext);
assertEquals(
1,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getSavepointHistory()
.size());
}
@Test
public void observeCheckpoint() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Long timedOutNonce = 1L;
final var errorReason = EventRecorder.Reason.CheckpointError.name();
final var namespace = deployment.getMetadata().getNamespace();
final var timedOutMessage =
"Checkpoint failed for checkpointTriggerNonce: " + timedOutNonce;
deployment.getSpec().getJob().setCheckpointTriggerNonce(timedOutNonce);
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
bringToReadyStatus(deployment);
assertTrue(ReconciliationUtils.isJobRunning(deployment.getStatus()));
flinkService.triggerCheckpoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
getCheckpointInfo(deployment),
conf);
// pending checkpoint
assertEquals("checkpoint_trigger_0", getCheckpointInfo(deployment).getTriggerId());
observer.observe(deployment, readyContext);
assertTrue(SnapshotUtils.checkpointInProgress(deployment.getStatus().getJobStatus()));
getCheckpointInfo(deployment).setTriggerId("unknown");
// checkpoint error within grace period
assertEquals(0, countErrorEvents(errorReason, namespace, timedOutMessage));
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.checkpointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(1, countErrorEvents(errorReason, namespace, timedOutMessage));
getCheckpointInfo(deployment).setTriggerId("unknown");
deployment
.getStatus()
.getJobStatus()
.getCheckpointInfo()
.setTriggerType(SnapshotTriggerType.MANUAL);
deployment
.getStatus()
.getJobStatus()
.getCheckpointInfo()
.setTriggerTimestamp(Instant.now().minus(Duration.ofHours(1)).toEpochMilli());
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.checkpointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(2, countErrorEvents(errorReason, namespace, timedOutMessage));
// checkpoint success
Long firstNonce = 123L;
deployment.getSpec().getJob().setCheckpointTriggerNonce(firstNonce);
flinkService.triggerCheckpoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
getCheckpointInfo(deployment),
conf);
assertEquals("checkpoint_trigger_1", getCheckpointInfo(deployment).getTriggerId());
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertEquals(
firstNonce, getCheckpointInfo(deployment).getLastCheckpoint().getTriggerNonce());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
// second attempt success
Long secondNonce = 456L;
deployment.getSpec().getJob().setCheckpointTriggerNonce(secondNonce);
flinkService.triggerCheckpoint(
getJobStatus(deployment).getJobId(),
SnapshotTriggerType.MANUAL,
getCheckpointInfo(deployment),
conf);
assertEquals("checkpoint_trigger_2", getCheckpointInfo(deployment).getTriggerId());
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertEquals(
secondNonce, getCheckpointInfo(deployment).getLastCheckpoint().getTriggerNonce());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
// application failure after checkpoint trigger
Long thirdNonce = 789L;
deployment.getSpec().getJob().setCheckpointTriggerNonce(thirdNonce);
flinkService.triggerCheckpoint(
getJobStatus(deployment).getJobId(),
SnapshotTriggerType.MANUAL,
getCheckpointInfo(deployment),
conf);
assertEquals("checkpoint_trigger_3", getCheckpointInfo(deployment).getTriggerId());
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
flinkService.setPortReady(false);
observer.observe(deployment, readyContext);
assertEquals(
secondNonce, getCheckpointInfo(deployment).getLastCheckpoint().getTriggerNonce());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
final var thirdNonceMessage = "Checkpoint failed for checkpointTriggerNonce: " + thirdNonce;
assertEquals(1, countErrorEvents(errorReason, namespace, thirdNonceMessage));
flinkService.setPortReady(true);
observer.observe(deployment, readyContext);
// Simulate Failed job
var jobTuple = flinkService.listJobs().get(0);
jobTuple.f0 = "last-SP";
jobTuple.f1 =
new JobStatusMessage(
jobTuple.f1.getJobId(),
jobTuple.f1.getJobName(),
org.apache.flink.api.common.JobStatus.FAILED,
jobTuple.f1.getStartTime());
getCheckpointInfo(deployment).setTriggerId("test");
getCheckpointInfo(deployment).setTriggerTimestamp(123L);
observer.observe(deployment, readyContext);
assertEquals(
org.apache.flink.api.common.JobStatus.FAILED.name(),
getJobStatus(deployment).getState());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
}
private Integer countErrorEvents(String errorReason, String namespace, String timedOutMessage) {
return kubernetesClient.v1().events().inNamespace(namespace).list().getItems().stream()
.filter(e -> e.getReason().contains(errorReason))
.filter(e -> e.getMessage().equals(timedOutMessage))
.map(Event::getCount)
.findFirst()
.orElse(0);
}
@Test
public void testSavepointFormat() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
bringToReadyStatus(deployment);
assertTrue(ReconciliationUtils.isJobRunning(deployment.getStatus()));
// canonical savepoint
Long firstNonce = 123L;
deployment.getSpec().getJob().setSavepointTriggerNonce(firstNonce);
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
firstNonce,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getTriggerNonce());
assertEquals(
SavepointFormatType.CANONICAL,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getFormatType());
// native savepoint
Long secondNonce = 456L;
deployment.getSpec().getJob().setSavepointTriggerNonce(secondNonce);
deployment
.getSpec()
.setFlinkConfiguration(
Map.of(
OPERATOR_SAVEPOINT_FORMAT_TYPE.key(),
org.apache.flink.core.execution.SavepointFormatType.NATIVE.name()));
conf = configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.triggerSavepoint(
deployment.getStatus().getJobStatus().getJobId(),
SnapshotTriggerType.MANUAL,
deployment.getStatus().getJobStatus().getSavepointInfo(),
conf);
observer.observe(deployment, readyContext);
observer.observe(deployment, readyContext);
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
secondNonce,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getTriggerNonce());
assertEquals(
SavepointFormatType.NATIVE,
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getFormatType());
}
private void bringToReadyStatus(FlinkDeployment deployment) {
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName("jobname");
jobStatus.setJobId("0000000000");
jobStatus.setState(JobState.RUNNING.name());
deployment.getStatus().setJobStatus(jobStatus);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
@Test
public void observeListJobsError() {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// simulate deployment failure
String podFailedMessage = "list jobs error";
flinkService.setPodList(
TestUtils.createFailedPodList(
podFailedMessage, DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
flinkService.setPortReady(false);
Exception exception =
assertThrows(
DeploymentFailedException.class,
() ->
observer.observe(
deployment,
TestUtils.createContextWithInProgressDeployment(
kubernetesClient)));
assertEquals(podFailedMessage, exception.getMessage());
}
@Test
public void observeAlreadyUpgraded() {
var kubernetesDeployment = TestUtils.createDeployment(true);
kubernetesDeployment.getMetadata().setAnnotations(new HashMap<>());
var context = TestUtils.createContextWithDeployment(kubernetesDeployment, kubernetesClient);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getMetadata().setGeneration(123L);
var status = deployment.getStatus();
var reconStatus = status.getReconciliationStatus();
// New deployment
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
deployment,
new FlinkConfigManager(new Configuration())
.getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
// Test regular upgrades
deployment.getSpec().getJob().setParallelism(5);
deployment.getMetadata().setGeneration(321L);
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
deployment,
new FlinkConfigManager(new Configuration())
.getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
status = deployment.getStatus();
reconStatus = status.getReconciliationStatus();
assertEquals(status.getReconciliationStatus().getState(), ReconciliationState.UPGRADING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
// Kubernetes Deployment is not there yet
observer.observe(deployment, TestUtils.createEmptyContext());
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
// Kubernetes Deployment is there but without the correct label
observer.observe(deployment, context);
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
// We set the correct generation label on the kubernetes deployment
kubernetesDeployment
.getMetadata()
.getAnnotations()
.put(FlinkUtils.CR_GENERATION_LABEL, "321");
deployment.getMetadata().setGeneration(322L);
deployment.getSpec().getJob().setParallelism(4);
// Simulate marked for deletion, make sure we don't recognize this as a valid deployment
kubernetesDeployment.getMetadata().setDeletionTimestamp(Instant.now().toString());
observer.observe(deployment, context);
assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
// Reset deletion flag
kubernetesDeployment.getMetadata().setDeletionTimestamp(null);
// Simulate non-missing deployment, this happens in the middle of savepoint upgrades
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observer.observe(deployment, context);
assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
// Reset to missing
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
// Deployment is missing and kubernetes deployment matches the target generation
// should be recognized as deployed
observer.observe(deployment, context);
assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
status.getJobManagerDeploymentStatus());
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
assertEquals(JobState.RUNNING, specWithMeta.getSpec().getJob().getState());
assertEquals(5, specWithMeta.getSpec().getJob().getParallelism());
}
@Test
public void observeAlreadyScaled() {
var deployment = TestUtils.buildApplicationCluster();
// Update status for for running job
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
deployment,
new FlinkConfigManager(new Configuration())
.getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
var conf = new Configuration();
var v1 = new JobVertexID();
conf.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "2"));
deployment.getSpec().setFlinkConfiguration(conf.toMap());
// Assert that we move to deployed when in deprecated scaling UPGRADING state
ReconciliationUtils.updateStatusForSpecReconciliation(
deployment, JobState.RUNNING, conf, true, Clock.systemDefaultZone());
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
observer.observe(deployment, context);
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
}
@Test
public void validateLastReconciledClearedOnInitialFailure() {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getMetadata().setGeneration(123L);
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
deployment,
new FlinkConfigManager(new Configuration())
.getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
var reconStatus = deployment.getStatus().getReconciliationStatus();
assertTrue(
reconStatus.deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
assertFalse(reconStatus.isBeforeFirstDeployment());
observer.observe(deployment, TestUtils.createEmptyContext());
assertTrue(reconStatus.isBeforeFirstDeployment());
}
}