blob: 394be6e80aee46487058f0a30bf27f78e40cf5a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.service.NativeFlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.util.concurrent.Executors;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobSpec;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getReconciledJobSpec;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getReconciledJobState;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getSavepointInfo;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
import static org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.MSG_SUBMIT;
import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RECOVERY;
import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RESTART_UNHEALTHY;
import static org.apache.flink.kubernetes.operator.utils.SnapshotUtils.getLastSnapshotStatus;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* @link JobStatusObserver unit tests
*/
@EnableKubernetesMockClient(crud = true)
public class ApplicationReconcilerTest extends OperatorTestBase {
private TestReconcilerAdapter<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus>
reconciler;
@Getter private KubernetesClient kubernetesClient;
private ApplicationReconciler appReconciler;
private FlinkOperatorConfiguration operatorConfig;
private ExecutorService executorService;
private Clock testClock = Clock.systemDefaultZone();
@Override
public void setup() {
appReconciler =
new ApplicationReconciler(eventRecorder, statusRecorder, new NoopJobAutoscaler<>());
reconciler = new TestReconcilerAdapter<>(this, appReconciler);
operatorConfig = configManager.getOperatorConfiguration();
executorService = Executors.newDirectExecutorService();
}
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) throws Exception {
var conf = configManager.getDefaultConfig();
conf.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
configManager.updateDefaultConfig(conf);
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
// session ready
reconciler.reconcile(
deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// clean up
assertEquals(
null, deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
reconciler.cleanup(
deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(
"savepoint_0",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
}
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersion)
throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
deployment
.getSpec()
.getFlinkConfiguration()
.put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true");
// session ready
reconciler.reconcile(
deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// clean up
assertEquals(
null, deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
reconciler.cleanup(
deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(
"savepoint_0",
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getLastSavepoint()
.getLocation());
}
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
assertTrue(
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.isFirstDeployment());
JobID jobId = runningJobs.get(0).f1.getJobId();
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
// Test stateless upgrade
FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment);
getJobSpec(statelessUpgrade).setUpgradeMode(UpgradeMode.STATELESS);
statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
reconciler.reconcile(statelessUpgrade, context);
assertFalse(
statelessUpgrade
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.isFirstDeployment());
assertEquals(0, flinkService.getRunningCount());
reconciler.reconcile(statelessUpgrade, context);
assertFalse(
statelessUpgrade
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.isFirstDeployment());
assertEquals(null, getLastSnapshotStatus(statelessUpgrade, SAVEPOINT));
runningJobs = flinkService.listJobs();
assertEquals(1, flinkService.getRunningCount());
assertNull(runningJobs.get(0).f0);
assertNotEquals(
runningJobs.get(0).f2.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId);
jobId = runningJobs.get(0).f1.getJobId();
getJobStatus(deployment).setJobId(jobId.toHexString());
// Test stateful upgrade
FlinkDeployment statefulUpgrade = ReconciliationUtils.clone(deployment);
getJobSpec(statefulUpgrade).setUpgradeMode(UpgradeMode.SAVEPOINT);
statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
reconciler.reconcile(statefulUpgrade, context);
assertEquals(0, flinkService.getRunningCount());
reconciler.reconcile(statefulUpgrade, context);
runningJobs = flinkService.listJobs();
assertEquals(1, flinkService.getRunningCount());
assertEquals("savepoint_0", runningJobs.get(0).f0);
assertEquals(
SnapshotTriggerType.UPGRADE,
getSavepointInfo(statefulUpgrade).getLastSavepoint().getTriggerType());
assertEquals(SnapshotStatus.SUCCEEDED, getLastSnapshotStatus(statefulUpgrade, SAVEPOINT));
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(100L);
deployment
.getStatus()
.getReconciliationStatus()
.setLastStableSpec(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
getJobStatus(deployment).setState("RECONCILING");
try {
deployment
.getStatus()
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
reconciler.reconcile(deployment, context);
fail();
} catch (RecoveryFailureException expected) {
}
try {
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
reconciler.reconcile(deployment, context);
fail();
} catch (RecoveryFailureException expected) {
}
flinkService.clear();
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
getSavepointInfo(deployment)
.setLastSavepoint(Savepoint.of("finished_sp", SnapshotTriggerType.UPGRADE));
getJobStatus(deployment).setState("FINISHED");
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
assertEquals(1, flinkService.getRunningCount());
assertEquals("finished_sp", runningJobs.get(0).f0);
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
}
private void verifyJobId(
FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) {
// jobId set by operator
assertEquals(jobId, status.getJobId());
assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString());
}
@NotNull
private static Savepoint savepointFromSavepointInfo(
SavepointInfo savepointInfo, Long savepointTriggerNonce) {
return new Savepoint(
savepointInfo.getTriggerTimestamp(),
// To make sure the new savepoint has a new path
savepointInfo.getTriggerId()
+ savepointInfo.getTriggerTimestamp()
+ savepointInfo.getTriggerId()
+ savepointTriggerNonce,
savepointInfo.getTriggerType(),
savepointInfo.getFormatType(),
SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType()
? savepointTriggerNonce
: null);
}
@Test
public void triggerCheckpoint() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
testSnapshot(deployment, CHECKPOINT);
}
@Test
public void triggerSavepoint() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
testSnapshot(deployment, SAVEPOINT);
}
@Test
public void verifyStatusUpdatedBeforeDeploy() throws Exception {
// Bootstrap running deployment status
var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_17);
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// Suspend job and make sure deployment is not deleted (savepoint upgrade)
deployment.getSpec().getJob().setState(JobState.SUSPENDED);
reconciler.reconcile(deployment, context);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Resume but trigger deploy failure
deployment.getSpec().getJob().setState(JobState.RUNNING);
flinkService.setDeployFailure(true);
try {
reconciler.reconcile(deployment, context);
fail();
} catch (Exception expected) {
// Make sure deployment deletion is already persisted in k8s
deployment.getStatus().setJobManagerDeploymentStatus(null);
statusRecorder.updateStatusFromCache(deployment);
assertEquals(
JobManagerDeploymentStatus.MISSING,
deployment.getStatus().getJobManagerDeploymentStatus());
}
}
private void testSnapshot(FlinkDeployment deployment, SnapshotType snapshotType)
throws Exception {
final Predicate<JobStatus> isSnapshotInProgress;
final Function<FlinkDeployment, SnapshotInfo> getSnapshotInfo;
final BiConsumer<JobSpec, Long> setTriggerNonce;
final Function<JobSpec, Long> getTriggerNonce;
final Consumer<FlinkDeployment> updateLastSnapshot;
final BiConsumer<FlinkDeployment, Long> setLastSnapshotTime;
final ConfigOption<String> triggerSnapshotExpression;
final String triggerPrefix;
switch (snapshotType) {
case SAVEPOINT:
isSnapshotInProgress = SnapshotUtils::savepointInProgress;
getSnapshotInfo = FlinkResourceUtils::getSavepointInfo;
setTriggerNonce = JobSpec::setSavepointTriggerNonce;
getTriggerNonce = JobSpec::getSavepointTriggerNonce;
updateLastSnapshot =
flinkDeployment -> {
var savepointInfo = getSavepointInfo(flinkDeployment);
var savepoint =
savepointFromSavepointInfo(
savepointInfo,
getJobSpec(flinkDeployment).getSavepointTriggerNonce());
savepointInfo.updateLastSavepoint(savepoint);
};
setLastSnapshotTime =
(flinkDeployment, timestamp) -> {
Savepoint lastSavepoint =
Savepoint.of("", timestamp, SnapshotTriggerType.PERIODIC);
flinkDeployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.updateLastSavepoint(lastSavepoint);
};
triggerSnapshotExpression =
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
triggerPrefix = "savepoint_";
break;
case CHECKPOINT:
isSnapshotInProgress = SnapshotUtils::checkpointInProgress;
getSnapshotInfo = FlinkResourceUtils::getCheckpointInfo;
setTriggerNonce = JobSpec::setCheckpointTriggerNonce;
getTriggerNonce = JobSpec::getCheckpointTriggerNonce;
updateLastSnapshot =
flinkDeployment -> {
var checkpointInfo = getCheckpointInfo(flinkDeployment);
var checkpoint =
checkpointFromCheckpointInfo(
checkpointInfo,
getJobSpec(flinkDeployment)
.getCheckpointTriggerNonce());
checkpointInfo.updateLastCheckpoint(checkpoint);
};
setLastSnapshotTime =
(flinkDeployment, timestamp) -> {
Checkpoint lastCheckpoint =
Checkpoint.of(timestamp, SnapshotTriggerType.PERIODIC);
flinkDeployment
.getStatus()
.getJobStatus()
.getCheckpointInfo()
.updateLastCheckpoint(lastCheckpoint);
};
triggerSnapshotExpression =
KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
triggerPrefix = "checkpoint_";
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
assertFalse(isSnapshotInProgress.test(getJobStatus(deployment)));
assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
assertNull(getLastSnapshotStatus(deployment, snapshotType));
FlinkDeployment snDeployment = ReconciliationUtils.clone(deployment);
// don't trigger if nonce is missing
reconciler.reconcile(snDeployment, context);
assertFalse(isSnapshotInProgress.test((getJobStatus(snDeployment))));
assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
assertNull(getLastSnapshotStatus(snDeployment, snapshotType));
// trigger when nonce is defined
setTriggerNonce.accept(getJobSpec(snDeployment), ThreadLocalRandom.current().nextLong());
reconciler.reconcile(snDeployment, context);
assertNull(getTriggerNonce.apply(getReconciledJobSpec(snDeployment)));
assertEquals(
triggerPrefix + "trigger_0", getSnapshotInfo.apply(snDeployment).getTriggerId());
assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertEquals(SnapshotStatus.PENDING, getLastSnapshotStatus(snDeployment, snapshotType));
// don't trigger when snapshot is in progress
reconciler.reconcile(snDeployment, context);
assertEquals(
triggerPrefix + "trigger_0", getSnapshotInfo.apply(snDeployment).getTriggerId());
assertEquals(
SnapshotTriggerType.MANUAL, getSnapshotInfo.apply(snDeployment).getTriggerType());
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
getSnapshotInfo.apply(snDeployment), snDeployment, snapshotType);
getSnapshotInfo.apply(snDeployment).resetTrigger();
// don't trigger when nonce is the same
reconciler.reconcile(snDeployment, context);
assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertNull(getLastSnapshotStatus(snDeployment, snapshotType));
// trigger when new nonce is defined
setTriggerNonce.accept(getJobSpec(snDeployment), ThreadLocalRandom.current().nextLong());
reconciler.reconcile(snDeployment, context);
assertEquals(
triggerPrefix + "trigger_1", getSnapshotInfo.apply(snDeployment).getTriggerId());
assertEquals(
SnapshotTriggerType.MANUAL, getSnapshotInfo.apply(snDeployment).getTriggerType());
// re-trigger after reset
getSnapshotInfo.apply(snDeployment).resetTrigger();
reconciler.reconcile(snDeployment, context);
assertEquals(
triggerPrefix + "trigger_2", getSnapshotInfo.apply(snDeployment).getTriggerId());
assertEquals(
SnapshotTriggerType.MANUAL, getSnapshotInfo.apply(snDeployment).getTriggerType());
// reconciled and snapshot is updated
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
getSnapshotInfo.apply(snDeployment), snDeployment, snapshotType);
updateLastSnapshot.accept(snDeployment);
assertEquals(SnapshotStatus.SUCCEEDED, getLastSnapshotStatus(snDeployment, snapshotType));
// re-trigger, reconciled but snapshot is not updated
setTriggerNonce.accept(getJobSpec(snDeployment), ThreadLocalRandom.current().nextLong());
reconciler.reconcile(snDeployment, context);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
getSnapshotInfo.apply(snDeployment), snDeployment, snapshotType);
getSnapshotInfo.apply(snDeployment).resetTrigger();
assertEquals(SnapshotStatus.ABANDONED, getLastSnapshotStatus(snDeployment, snapshotType));
// don't trigger when nonce is cleared
setTriggerNonce.accept(getJobSpec(snDeployment), null);
reconciler.reconcile(snDeployment, context);
assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
// trigger by periodic interval settings
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(), "1");
reconciler.reconcile(snDeployment, context);
assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertEquals(SnapshotStatus.PENDING, getLastSnapshotStatus(snDeployment, snapshotType));
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(), "0");
// trigger by cron expression
updateLastSnapshot.accept(snDeployment); // Ensures no snapshot is considered to be running
assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertNotEquals(SnapshotStatus.PENDING, getLastSnapshotStatus(snDeployment, snapshotType));
Calendar calendar = Calendar.getInstance();
calendar.set(2022, Calendar.JUNE, 5, 11, 0);
setLastSnapshotTime.accept(
snDeployment, calendar.getTimeInMillis()); // Required for the cron to trigger
snDeployment
.getSpec()
.getFlinkConfiguration()
.put(triggerSnapshotExpression.key(), "0 0 12 5 6 ? 2022");
reconciler.reconcile(snDeployment, context);
assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertEquals(SnapshotStatus.PENDING, getLastSnapshotStatus(snDeployment, snapshotType));
snDeployment
.getSpec()
.getFlinkConfiguration()
.put(triggerSnapshotExpression.key(), triggerSnapshotExpression.defaultValue());
}
@NotNull
private static Checkpoint checkpointFromCheckpointInfo(
CheckpointInfo checkpointInfo, Long checkpointTriggerNonce) {
return new Checkpoint(
checkpointInfo.getTriggerTimestamp(),
checkpointInfo.getTriggerType(),
checkpointInfo.getFormatType(),
SnapshotTriggerType.MANUAL == checkpointInfo.getTriggerType()
? checkpointTriggerNonce
: null);
}
@Test
public void triggerRestart() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
// Test restart job
FlinkDeployment restartJob = ReconciliationUtils.clone(deployment);
restartJob.getSpec().setRestartNonce(1L);
reconciler.reconcile(restartJob, context);
assertEquals(JobState.SUSPENDED, getReconciledJobState(restartJob));
assertEquals(0, flinkService.getRunningCount());
reconciler.reconcile(restartJob, context);
assertEquals(JobState.RUNNING, getReconciledJobState(restartJob));
assertEquals(1, flinkService.getRunningCount());
assertEquals(
1L,
restartJob
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getRestartNonce());
}
private void verifyAndSetRunningJobsToStatus(
FlinkDeployment deployment,
List<Tuple3<String, JobStatusMessage, Configuration>> runningJobs) {
assertEquals(1, runningJobs.size());
assertNull(runningJobs.get(0).f0);
deployment
.getStatus()
.setJobStatus(
new JobStatus()
.toBuilder()
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
.updateTime(Long.toString(System.currentTimeMillis()))
.state("RUNNING")
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
@Test
public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
getJobSpec(spDeployment).setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0", getSavepointInfo(spDeployment).getTriggerId());
assertEquals(JobState.RUNNING.name(), getJobStatus(spDeployment).getState());
// Force upgrade when savepoint is in progress.
spDeployment
.getSpec()
.getFlinkConfiguration()
.put(
KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(),
"true");
spDeployment.getSpec().setImage("flink:greatest");
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0", getSavepointInfo(spDeployment).getTriggerId());
assertEquals(
org.apache.flink.api.common.JobStatus.FINISHED.name(),
getJobStatus(spDeployment).getState());
}
@Test
public void testRandomJobResultStorePath() throws Exception {
FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
final String haStoragePath = "file:///flink-data/ha";
flinkApp.getSpec()
.getFlinkConfiguration()
.put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), haStoragePath);
ObjectMeta deployMeta = flinkApp.getMetadata();
FlinkDeploymentStatus status = flinkApp.getStatus();
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
.deploy(getResourceContext(flinkApp), spec, deployConfig, Optional.empty(), false);
String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path1.startsWith(haStoragePath));
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
.deploy(getResourceContext(flinkApp), spec, deployConfig, Optional.empty(), false);
String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path2.startsWith(haStoragePath));
assertNotEquals(path1, path2);
}
@Test
public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_14);
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
reconciler.reconcile(deployment, context);
deployment.getSpec().setFlinkVersion(FlinkVersion.v1_15);
var reconStatus = deployment.getStatus().getReconciliationStatus();
// Do not trigger update until running
reconciler.reconcile(deployment, context);
assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
getJobStatus(deployment).setState(JobState.RUNNING.name());
getJobStatus(deployment)
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
reconciler.reconcile(deployment, context);
assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
assertEquals(
UpgradeMode.SAVEPOINT,
reconStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
}
@Test
public void testScaleWithReactiveModeDisabled() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
getJobSpec(deployment).setParallelism(100);
reconciler.reconcile(deployment, context);
assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
}
@Test
public void testScaleWithReactiveModeEnabled() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
deployment
.getSpec()
.getFlinkConfiguration()
.put(
JobManagerOptions.SCHEDULER_MODE.key(),
SchedulerExecutionMode.REACTIVE.name());
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// the default.parallelism is always ignored
deployment
.getSpec()
.getFlinkConfiguration()
.put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
reconciler.reconcile(deployment, context);
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
assertEquals(0, flinkService.getDesiredReplicas());
getJobSpec(deployment).setParallelism(4);
reconciler.reconcile(deployment, context);
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
assertEquals(2, flinkService.getDesiredReplicas());
getJobSpec(deployment).setParallelism(8);
reconciler.reconcile(deployment, context);
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
assertEquals(4, flinkService.getDesiredReplicas());
}
@Test
public void testScaleWithRescaleApi() throws Exception {
var rescaleCounter = new AtomicInteger(0);
var v1 = new JobVertexID();
// We create a service mocking out some methods we don't want to call explicitly
var nativeService =
new NativeFlinkService(
kubernetesClient, null, executorService, operatorConfig, eventRecorder) {
Map<JobVertexID, JobVertexResourceRequirements> submitted =
Map.of(
v1,
new JobVertexResourceRequirements(
new JobVertexResourceRequirements.Parallelism(1, 1)));
@Override
protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
RestClusterClient<String> c, AbstractFlinkResource<?, ?> r) {
return submitted;
}
@Override
protected void updateVertexResources(
RestClusterClient<String> c,
AbstractFlinkResource<?, ?> r,
Map<JobVertexID, JobVertexResourceRequirements> req) {
submitted = req;
rescaleCounter.incrementAndGet();
}
@Override
public void cancelJob(
FlinkDeployment deployment,
UpgradeMode upgradeMode,
Configuration conf) {}
};
var ctxFactory =
new TestingFlinkResourceContextFactory(
configManager, operatorMetricGroup, nativeService, eventRecorder);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
// Set all the properties required by the rescale api
deployment.getSpec().setFlinkVersion(FlinkVersion.v1_18);
deployment.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
deployment
.getSpec()
.getFlinkConfiguration()
.put(
JobManagerOptions.SCHEDULER.key(),
JobManagerOptions.SchedulerType.Adaptive.name());
deployment.getMetadata().setGeneration(1L);
// Deploy the job and update the status accordingly so we can proceed to rescaling it
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// Override parallelism for a vertex and trigger rescaling
deployment
.getSpec()
.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1.toHexString() + ":2");
deployment.getMetadata().setGeneration(2L);
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
assertEquals(1, rescaleCounter.get());
assertEquals(
EventRecorder.Reason.Scaling.toString(),
eventCollector.events.getLast().getReason());
assertEquals(3, eventCollector.events.size());
// Job should not be stopped, we simply call the rescale api
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
var reconStatus = deployment.getStatus().getReconciliationStatus();
assertEquals(
v1.toHexString() + ":2",
reconStatus
.deserializeLastReconciledSpec()
.getFlinkConfiguration()
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
assertFalse(reconStatus.isLastReconciledSpecStable());
// Reconciler should not do anything after successful scaling
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
assertEquals(1, rescaleCounter.get());
assertEquals(3, eventCollector.events.size());
assertFalse(reconStatus.isLastReconciledSpecStable());
}
@Test
public void testApplyAutoscalerParallelism() throws Exception {
var ctxFactory =
new TestingFlinkResourceContextFactory(
configManager, operatorMetricGroup, flinkService, eventRecorder);
var overrideFunction = new AtomicReference<Consumer<AbstractFlinkSpec>>(s -> {});
JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler =
new NoopJobAutoscaler<>() {
@Override
public void scale(KubernetesJobAutoScalerContext ctx) {
overrideFunction.get().accept(ctx.getResource().getSpec());
}
};
appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler);
var deployment = TestUtils.buildApplicationCluster();
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
assertFalse(deployment.getStatus().isImmediateReconciliationNeeded());
// Job running verify no upgrades if overrides are empty
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals("RUNNING", deployment.getStatus().getJobStatus().getState());
// Test overrides are applied correctly
var v1 = new JobVertexID();
overrideFunction.set(
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(
Map.of(v1.toHexString(), "2"),
ctxFactory
.getResourceContext(deployment, context)
.getObserveConfig()
.get(PipelineOptions.PARALLELISM_OVERRIDES));
}
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void verifyJobIdNotResetDuringLastStateRecovery(FlinkVersion flinkVersion) {
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
flinkService.setDeployFailure(true);
try {
reconciler.reconcile(deployment, context);
} catch (Exception expected) {
}
statusRecorder.updateStatusFromCache(deployment);
if (!flinkVersion.isEqualOrNewer(FlinkVersion.v1_16)) {
assertFalse(StringUtils.isBlank(getJobStatus(deployment).getJobId()));
}
}
@Test
public void testSetOwnerReference() throws Exception {
FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
ObjectMeta deployMeta = flinkApp.getMetadata();
FlinkDeploymentStatus status = flinkApp.getStatus();
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
.deploy(getResourceContext(flinkApp), spec, deployConfig, Optional.empty(), false);
final List<Map<String, String>> expectedOwnerReferences =
List.of(TestUtils.generateTestOwnerReferenceMap(flinkApp));
List<Map<String, String>> or =
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}
@Test
public void testTerminalJmTtl() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
getJobSpec(deployment).setUpgradeMode(UpgradeMode.SAVEPOINT);
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
getJobSpec(deployment).setState(JobState.SUSPENDED);
reconciler.reconcile(deployment, context);
var status = deployment.getStatus();
assertEquals(
org.apache.flink.api.common.JobStatus.FINISHED.toString(),
status.getJobStatus().getState());
assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
deployment
.getSpec()
.getFlinkConfiguration()
.put(
KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL.key(),
String.valueOf(Duration.ofMinutes(5).toMillis()));
var now = Instant.now();
status.getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
reconciler
.getReconciler()
.setClock(Clock.fixed(now.plus(Duration.ofMinutes(3)), ZoneId.systemDefault()));
reconciler.reconcile(deployment, context);
assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
reconciler
.getReconciler()
.setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)), ZoneId.systemDefault()));
reconciler.reconcile(deployment, context);
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testClusterCleanupBeforeDeploy(boolean requireMetadata) throws Exception {
var flinkApp = TestUtils.buildApplicationCluster();
var status = flinkApp.getStatus();
var spec = flinkApp.getSpec();
var deployConfig = configManager.getDeployConfig(flinkApp.getMetadata(), spec);
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec, flinkApp);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
var deleted = new AtomicBoolean(false);
flinkService =
new TestingFlinkService() {
@Override
protected void deleteHAData(
String namespace, String clusterId, Configuration conf) {
deleted.set(true);
}
};
reconciler
.getReconciler()
.deploy(
getResourceContext(flinkApp),
spec,
deployConfig,
Optional.empty(),
requireMetadata);
assertEquals(deleted.get(), !requireMetadata);
assertEquals(JobManagerDeploymentStatus.DEPLOYING, status.getJobManagerDeploymentStatus());
}
@Test
public void testDeploymentRecoveryEvent() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
Assertions.assertEquals(MSG_SUBMIT, eventCollector.events.remove().getMessage());
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
flinkService.clear();
FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
deploymentStatus
.getJobStatus()
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
reconciler.reconcile(deployment, context);
Assertions.assertEquals(MSG_RECOVERY, eventCollector.events.remove().getMessage());
}
@Test
public void testRestartUnhealthyEvent() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment
.getSpec()
.getFlinkConfiguration()
.put(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true");
reconciler.reconcile(deployment, context);
Assertions.assertEquals(MSG_SUBMIT, eventCollector.events.remove().getMessage());
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
var clusterHealthInfo = new ClusterHealthInfo();
clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
clusterHealthInfo.setNumRestarts(2);
clusterHealthInfo.setHealthy(false);
ClusterHealthEvaluator.setLastValidClusterHealthInfo(
deployment.getStatus().getClusterInfo(), clusterHealthInfo);
reconciler.reconcile(deployment, context);
Assertions.assertEquals(MSG_RESTART_UNHEALTHY, eventCollector.events.remove().getMessage());
}
@Test
public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
getJobSpec(deployment).setUpgradeMode(UpgradeMode.SAVEPOINT);
// We disable last state fallback as we want to test that the deployment is properly
// recovered before upgrade
deployment
.getSpec()
.getFlinkConfiguration()
.put(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
.key(),
"false");
// Initial deployment
reconciler.reconcile(deployment, context);
// Trigger upgrade but set jobmanager status to missing -> savepoint upgrade not available
deployment.getSpec().setRestartNonce(123L);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
flinkService.clear();
reconciler.reconcile(deployment, context);
// We verify that deployment was recovered before upgrade
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
var lastReconciledSpec =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
assertNotEquals(
deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
// Set to running to let savepoint upgrade proceed
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
reconciler.reconcile(deployment, context);
// Make sure upgrade is properly triggered now
lastReconciledSpec =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
assertEquals(deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
}
@Test
public void testUpgradeReconciledGeneration() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getMetadata().setGeneration(1L);
// Initial deployment
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
assertEquals(
1L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.getMetadata()
.getGeneration());
// Submit no-op upgrade
deployment.getSpec().getFlinkConfiguration().put("kubernetes.operator.test", "value");
deployment.getMetadata().setGeneration(2L);
reconciler.reconcile(deployment, context);
assertEquals(
2L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.getMetadata()
.getGeneration());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRollbackUpgradeModeHandling(boolean jmStarted) throws Exception {
var deployment = TestUtils.buildApplicationCluster();
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
offsetReconcilerClock(deployment, Duration.ZERO);
var flinkConfiguration = deployment.getSpec().getFlinkConfiguration();
flinkConfiguration.put(
KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
flinkConfiguration.put(
KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
flinkConfiguration.put(
KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
.key(),
"false");
// Initial deployment, mark as stable
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
deployment.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
// Submit invalid change
deployment.getSpec().getJob().setParallelism(9999);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
assertEquals(1, flinkService.listJobs().size());
assertEquals(
UpgradeMode.STATELESS,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastStableSpec()
.getJob()
.getUpgradeMode());
assertEquals(
UpgradeMode.SAVEPOINT,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode());
// Trigger rollback by delaying the recovery
offsetReconcilerClock(deployment, Duration.ofSeconds(15));
flinkService.setHaDataAvailable(jmStarted);
flinkService.setJobManagerReady(jmStarted);
reconciler.reconcile(deployment, context);
assertEquals(
ReconciliationState.ROLLING_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(0, flinkService.listJobs().size());
assertEquals("FINISHED", deployment.getStatus().getJobStatus().getState());
assertEquals(
jmStarted ? UpgradeMode.LAST_STATE : UpgradeMode.SAVEPOINT,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode());
flinkService.setJobManagerReady(true);
reconciler.reconcile(deployment, context);
assertEquals(
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(1, flinkService.listJobs().size());
assertEquals("RECONCILING", deployment.getStatus().getJobStatus().getState());
}
@ParameterizedTest
@EnumSource(UpgradeMode.class)
public void testSavepointRedeploy(UpgradeMode upgradeMode) throws Exception {
var deployment = TestUtils.buildApplicationCluster();
deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
// Test savepoint redeploy for running job
verifySavepointRedeploy(deployment, runningJobs, "sp-t1");
// Test savepoint redeploy for non-running job, we just deployed
verifySavepointRedeploy(deployment, runningJobs, "sp-t2");
// Test redeploy for to the same savepoint path
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
verifySavepointRedeploy(deployment, runningJobs, "sp-t2");
// Null initialSavepoint path is not allowed. Normally caught during validation
assertThrows(
NullPointerException.class,
() -> verifySavepointRedeploy(deployment, runningJobs, null));
// Test savepoint redeploy when jobstate is set to suspended
deployment.getSpec().getJob().setState(JobState.SUSPENDED);
verifySavepointRedeploy(deployment, runningJobs, "sp-t3");
if (upgradeMode != UpgradeMode.STATELESS) {
// When we suspended with a new initial savepoint path simple spec changes should use
// the correct savepoint. This doesn't apply to stateless mode as that starts from empty
// state after suspend
deployment.getSpec().getJob().setParallelism(321);
verifySavepointRedeploy(deployment, runningJobs, "sp-t3");
deployment.getSpec().getJob().setState(JobState.SUSPENDED);
reconciler.reconcile(deployment, context);
assertEquals(
JobManagerDeploymentStatus.MISSING,
deployment.getStatus().getJobManagerDeploymentStatus());
// Test suspend and a new initialSavepointPath
deployment.getSpec().getJob().setState(JobState.RUNNING);
verifySavepointRedeploy(deployment, runningJobs, "sp-t4");
}
}
private void verifySavepointRedeploy(
FlinkDeployment deployment,
List<Tuple3<String, JobStatusMessage, Configuration>> runningJobs,
String savepoint)
throws Exception {
var job = deployment.getSpec().getJob();
job.setInitialSavepointPath(savepoint);
job.setSavepointRedeployNonce(
Optional.ofNullable(job.getSavepointRedeployNonce()).orElse(0L) + 1);
reconciler.reconcile(deployment, context);
boolean shouldRun = deployment.getSpec().getJob().getState() == JobState.RUNNING;
if (shouldRun) {
// Verify job is redeployed with sp
assertEquals(1, runningJobs.size());
assertEquals(savepoint, runningJobs.get(0).f0);
} else {
// Verify that job is stopped
assertTrue(runningJobs.isEmpty());
}
var status = deployment.getStatus();
assertEquals(
shouldRun
? JobManagerDeploymentStatus.DEPLOYING
: JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
// Verify that savepoint and upgrade mode is recorded correctly in reconciled spec
assertEquals(
savepoint,
status.getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
assertEquals(
UpgradeMode.SAVEPOINT,
status.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode());
assertTrue(status.getReconciliationStatus().isLastReconciledSpecStable());
}
private void offsetReconcilerClock(FlinkDeployment dep, Duration offset) {
testClock = Clock.offset(testClock, offset);
appReconciler.setClock(testClock);
}
}