blob: f141239e026328216e03c7593c127ac623a4e64a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.health;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** Logic encapsulating canary tests. */
@RequiredArgsConstructor
public class CanaryResourceManager<CR extends AbstractFlinkResource<?, ?>> {
private static final Logger LOG = LoggerFactory.getLogger(CanaryResourceManager.class);
public static final String CANARY_LABEL = "flink.apache.org/canary";
private final ConcurrentHashMap<ResourceID, CanaryResourceState> canaryResources =
new ConcurrentHashMap<>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final FlinkConfigManager configManager;
public boolean allCanariesHealthy() {
return canaryResources.values().stream().allMatch(cr -> cr.isHealthy);
}
public boolean handleCanaryResourceReconciliation(CR resource, KubernetesClient client) {
if (!isCanaryResource(resource)) {
return false;
}
var resourceId = ResourceID.fromResource(resource);
LOG.info("Reconciling canary resource");
canaryResources.compute(
resourceId,
(id, previousState) -> {
boolean firstReconcile = false;
if (previousState == null) {
firstReconcile = true;
previousState = new CanaryResourceState();
}
previousState.onReconcile(resource);
if (firstReconcile) {
updateSpecAndScheduleHealthCheck(resourceId, previousState, client);
}
return previousState;
});
return true;
}
public boolean handleCanaryResourceDeletion(CR resource) {
if (!isCanaryResource(resource)) {
return false;
}
var resourceId = ResourceID.fromResource(resource);
LOG.info("Deleting canary resource");
canaryResources.remove(resourceId);
return true;
}
private void updateSpecAndScheduleHealthCheck(
ResourceID resourceID, CanaryResourceState crs, KubernetesClient client) {
var canaryTimeout =
configManager
.getDefaultConfig()
.get(KubernetesOperatorConfigOptions.CANARY_RESOURCE_TIMEOUT);
Long restartNonce = crs.resource.getSpec().getRestartNonce();
crs.resource.getSpec().setRestartNonce(restartNonce == null ? 1L : restartNonce + 1);
crs.previousGeneration = crs.resource.getMetadata().getGeneration();
LOG.info("Scheduling canary check for {} in {}s", resourceID, canaryTimeout.toSeconds());
try {
client.resource(ReconciliationUtils.clone(crs.resource)).replace();
} catch (Throwable t) {
LOG.warn("Could not bump canary deployment, it may have been deleted", t);
}
executorService.schedule(
() -> checkHealth(resourceID, client),
canaryTimeout.toMillis(),
TimeUnit.MILLISECONDS);
}
@VisibleForTesting
protected void checkHealth(ResourceID resourceID, KubernetesClient client) {
CanaryResourceState crs = canaryResources.get(resourceID);
if (crs == null) {
LOG.info("Canary resource {} not found. Stopping health checks", resourceID);
return;
}
// We did not reconcile since the last spec update within deadline
if (crs.canaryReconciledSinceUpdate()) {
LOG.info("Canary deployment healthy");
crs.isHealthy = true;
} else {
LOG.error(
"Canary deployment {} latest spec not reconciled. Expected generation larger than {}, received {}",
resourceID,
crs.previousGeneration,
crs.resource.getMetadata().getGeneration());
crs.isHealthy = false;
}
// Update spec and reschedule health check
updateSpecAndScheduleHealthCheck(resourceID, crs, client);
}
@VisibleForTesting
public int getNumberOfActiveCanaries() {
return canaryResources.size();
}
public static boolean isCanaryResource(HasMetadata resource) {
var labels = resource.getMetadata().getLabels();
if (labels == null) {
return false;
}
return "true".equalsIgnoreCase(labels.getOrDefault(CANARY_LABEL, "false"));
}
private class CanaryResourceState {
CR resource;
long previousGeneration;
boolean isHealthy = true;
void onReconcile(CR cr) {
resource = cr;
}
boolean canaryReconciledSinceUpdate() {
return resource.getMetadata().getGeneration() > previousGeneration;
}
}
}