blob: be99fb113a2b8461a47a8877c86dfecad56eb85f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
* Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
* related resources including initial deployments, upgrades, rollbacks etc.
public abstract class AbstractFlinkResourceReconciler<
CR extends AbstractFlinkResource<SPEC, STATUS>,
SPEC extends AbstractFlinkSpec,
STATUS extends CommonStatus<SPEC>>
implements Reconciler<CR> {
private static final Logger LOG =
protected final EventRecorder eventRecorder;
protected final StatusRecorder<CR, STATUS> statusRecorder;
protected final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler;
public static final String MSG_SUSPENDED = "Suspending existing deployment.";
public static final String MSG_SPEC_CHANGED =
"%s change(s) detected (%s), starting reconciliation.";
public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
public static final String MSG_SUBMIT = "Starting deployment";
protected Clock clock = Clock.systemDefaultZone();
public AbstractFlinkResourceReconciler(
EventRecorder eventRecorder,
StatusRecorder<CR, STATUS> statusRecorder,
JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
this.eventRecorder = eventRecorder;
this.statusRecorder = statusRecorder;
this.autoscaler = autoscaler;
public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
var cr = ctx.getResource();
var status = cr.getStatus();
var reconciliationStatus = cr.getStatus().getReconciliationStatus();
// If the resource is not ready for reconciliation we simply return
if (!readyToReconcile(ctx)) {"Not ready for reconciliation yet...");
// If this is the first deployment for the resource we simply submit the job and return.
// No further logic is required at this point.
if (reconciliationStatus.isBeforeFirstDeployment()) {
var spec = cr.getSpec();
// If the job is submitted in suspend state, no need to reconcile
if (spec.getJob() != null && spec.getJob().getState().equals(JobState.SUSPENDED)) {
}"Deploying for the first time");
var deployConfig = ctx.getDeployConfig(spec);
cr, spec, deployConfig, status, ctx.getKubernetesClient());
ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
SPEC lastReconciledSpec =
SPEC currentDeploySpec = cr.getSpec();
var reconciliationState = reconciliationStatus.getState();
var specDiff =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
var diffType = specDiff.getType();
boolean specChanged =
DiffType.IGNORE != diffType || reconciliationState == ReconciliationState.UPGRADING;
if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
specChanged = true;
diffType = DiffType.UPGRADE;
if (specChanged) {
var deployConfig = ctx.getDeployConfig(cr.getSpec());
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
triggerSpecChangeEvent(cr, specDiff, ctx.getKubernetesClient());
// Try scaling if this is not an upgrade/redeploy change
boolean scaled =
&& diffType != DiffType.UPGRADE
&& scale(ctx, deployConfig);
// Reconcile spec change unless scaling was enough
if (scaled || reconcileSpecChange(diffType, ctx, deployConfig, lastReconciledSpec)) {
// If we executed a scale or spec upgrade action we return, otherwise we
// continue to reconcile other changes
} else {
if (!reconcileOtherChanges(ctx)) {"Resource fully reconciled, nothing to do...");
private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
var autoScalerCtx = ctx.getJobAutoScalerContext();
boolean autoscalerEnabled =
ctx.getResource().getSpec().getJob() != null
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
if (DiffType.IGNORE == specDiff.getType()) {
// This can happen if an ignore change comes in while we are waiting in upgrading state
// for scaling completion
String.format(MSG_SPEC_CHANGED, specDiff.getType(), specDiff),
"SpecChange: " + cr.getMetadata().getGeneration(),
* Update the status before the first deployment. We have to record the upgrade mode based on
* the initial savepoint path provided, and record the to-be-deployed spec in the status.
* @param cr Related flink resource
* @param spec Spec to be deployed
* @param deployConfig Deploy configuration
* @param status Resource status
private void updateStatusBeforeFirstDeployment(
CR cr, SPEC spec, Configuration deployConfig, STATUS status, KubernetesClient client) {
if (spec.getJob() != null) {
var initialUpgradeMode = UpgradeMode.STATELESS;
var initialSp = spec.getJob().getInitialSavepointPath();
if (initialSp != null) {
.setLastSavepoint(Savepoint.of(initialSp, SnapshotTriggerType.UNKNOWN));
initialUpgradeMode = UpgradeMode.SAVEPOINT;
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig, clock);
// Before we try to submit the job we record the current spec in the status so we can
// handle subsequent deployment and status update errors
statusRecorder.patchAndCacheStatus(cr, client);
* Check whether the given Flink resource is ready to be reconciled or we are still waiting for
* any pending operation or condition first.
* @param ctx Reconciliation context.
* @return True if the resource is ready to be reconciled.
protected abstract boolean readyToReconcile(FlinkResourceContext<CR> ctx);
* Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the
* status accordingly.
* @param diffType SpecChange diff type.
* @param ctx Reconciliation context.
* @param deployConfig Deployment configuration.
* @param lastReconciledSpec Last reconciled spec
* @throws Exception Error during spec upgrade.
* @return True if spec change reconciliation was executed
protected abstract boolean reconcileSpecChange(
DiffType diffType,
FlinkResourceContext<CR> ctx,
Configuration deployConfig,
SPEC lastReconciledSpec)
throws Exception;
* Reconcile any other changes required for this resource that are specific to the reconciler
* implementation.
* @param ctx Reconciliation context.
* @return True if any further reconciliation action was taken.
* @throws Exception Error during reconciliation.
protected abstract boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Exception;
public DeleteControl cleanup(FlinkResourceContext<CR> ctx) {
return cleanupInternal(ctx);
* Deploys the target resource spec to Kubernetes.
* @param ctx Reconciliation context.
* @param spec Spec that should be deployed to Kubernetes.
* @param deployConfig Flink conf for the deployment.
* @param savepoint Optional savepoint path for applications and session jobs.
* @param requireHaMetadata Flag used by application deployments to validate HA metadata
* @throws Exception Error during deployment.
public abstract void deploy(
FlinkResourceContext<CR> ctx,
SPEC spec,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
throws Exception;
* Shut down and clean up all Flink job/cluster resources.
* @param ctx Current context.
* @return DeleteControl object.
protected abstract DeleteControl cleanupInternal(FlinkResourceContext<CR> ctx);
* Checks whether the desired spec already matches the currently deployed spec. If they match
* the resource status is updated to reflect successful reconciliation.
* @param resource Resource being reconciled.
* @param deployConf Deploy configuration for the Flink resource.
* @return True if desired spec was already deployed.
private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
if (resource.getStatus().getReconciliationStatus().getState()
== ReconciliationState.UPGRADING
|| resource.getStatus().getReconciliationStatus().getState()
== ReconciliationState.ROLLING_BACK) {
return false;
AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
if (resource.getSpec().equals(deployedSpec)) {
"The new spec matches the currently deployed last stable spec. No upgrade needed.");
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConf, clock);
return true;
return false;
* Scale the cluster in-place if possible, either through reactive scaling or declarative
* resources.
* @param ctx Resource context.
* @param deployConfig Configuration to be deployed.
* @return True if the scaling is successful
* @throws Exception
private boolean scale(FlinkResourceContext<CR> ctx, Configuration deployConfig)
throws Exception {
var scaled = ctx.getFlinkService().scale(ctx, deployConfig);
if (scaled) {
ReconciliationUtils.updateStatusForDeployedSpec(ctx.getResource(), deployConfig, clock);
return scaled;
* Checks whether the currently deployed Flink resource spec should be rolled back to the stable
* spec. This includes validating the current deployment status, config and checking if the last
* reconciled spec did not become stable within the configured grace period.
* <p>Rollbacks are only supported to previously running resource specs with HA enabled.
* @param ctx Reconciliation context.
* @param specChanged Flag indicating whether the spec changed
* @return True if the resource should be rolled back.
private boolean shouldRollBack(
FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) {
var resource = ctx.getResource();
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
var configuration = ctx.getObserveConfig();
if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
return true;
if (specChanged) {
return false;
if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
|| reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
|| reconciliationStatus.isLastReconciledSpecStable()) {
return false;
var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
if (lastStableSpec == null) {
// Nothing to roll back to yet
return false;
if (lastStableSpec.getJob() != null
&& lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
// Should not roll back to suspended state
return false;
if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
// Should not roll back Flink version changes
return false;
Duration readinessTimeout =
if (!clock.instant()
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
return false;
if (lastReconciledSpec.getJob() != null
&& lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT
&& FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
// HA data not available as JM never start and relying on SAVEPOINT upgrade mode
// Safe to rollback relying on savepoint
return true;
var haDataAvailable = ctx.getFlinkService().isHaMetadataAvailable(configuration);
if (!haDataAvailable) {
LOG.warn("Rollback is not possible due to missing HA metadata");
return haDataAvailable;
private void prepareCrForRollback(
FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) {
var cr = ctx.getResource();
var status = cr.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) {
// When we initiate rollback we trigger a one time event
} else {
if (lastReconciledSpec.getJob() != null) {
// The rollback SUSPENDED status is not recorded anywhere currently. Since the
// reconciler looks at the lastReconciled spec state to decide on the next action
// (cancel vs deploy) this is a simple trick to make the rollback flow work
// correctly.
if (specChanged) {
// If spec has changed while rolling back we should apply new spec and move to upgrading
// state to break out of the rollback flow.
} else {
var job = cr.getSpec().getJob();
if (job != null) {
// The last stable spec may have a completely different upgrade mode, then what we
// used the last time. We set it based on the lastReconciledSpec
lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.STATELESS
? UpgradeMode.STATELESS
: UpgradeMode.LAST_STATE);
* Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This
* is triggered only if, jm deployment missing, recovery config and HA enabled. This logic is
* only used by the Session and Application reconcilers.
* @param conf Flink cluster configuration.
* @param deployment FlinkDeployment object.
* @return True if recovery should be executed.
protected boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment deployment) {
boolean result = false;
if (conf.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)) {
LOG.debug("Checking whether jobmanager deployment needs recovery");
if (jmMissingForRunningDeployment(deployment)) {
LOG.debug("Jobmanager deployment is missing, trying to recover");
var jobSpec = deployment.getSpec().getJob();
boolean stateless =
jobSpec != null && jobSpec.getUpgradeMode() == UpgradeMode.STATELESS;
if (stateless || HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
LOG.debug("HA is enabled, recovering lost jobmanager deployment");
result = true;
} else {
LOG.warn("Could not recover lost jobmanager deployment without HA enabled");
return result;
private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob();
return (deployedJob == null || deployedJob.getState() == JobState.RUNNING)
&& (deployment.getStatus().getJobManagerDeploymentStatus()
== JobManagerDeploymentStatus.MISSING);
protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
if (oldSpec instanceof FlinkDeploymentSpec) {
return ((FlinkDeploymentSpec) oldSpec).getFlinkVersion()
!= ((FlinkDeploymentSpec) newSpec).getFlinkVersion();
return false;
protected void setOwnerReference(CR owner, Configuration deployConfig) {
final Map<String, String> ownerReference =
"apiVersion", owner.getApiVersion(),
"kind", owner.getKind(),
"name", owner.getMetadata().getName(),
"uid", owner.getMetadata().getUid(),
"blockOwnerDeletion", "true",
"controller", "false");
KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, List.of(ownerReference));
public void setClock(Clock clock) {
this.clock = clock;