blob: a679606eca412ac839a79d8316549f1ddd011e87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kubernetes.cluster.lock;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.CamelContext;
import org.apache.camel.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Monitors current status and participate to leader election when no active
* leaders are present. It communicates changes in leadership and cluster
* members to the given event handler.
*/
public class KubernetesLeadershipController implements Service {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class);
private enum State {
NOT_LEADER, BECOMING_LEADER, LEADER
}
private CamelContext camelContext;
private KubernetesClient kubernetesClient;
private KubernetesLockConfiguration lockConfiguration;
private KubernetesClusterEventHandler eventHandler;
private State currentState = State.NOT_LEADER;
private ScheduledExecutorService serializedExecutor;
private TimedLeaderNotifier leaderNotifier;
private volatile LeaderInfo latestLeaderInfo;
private volatile ConfigMap latestConfigMap;
private volatile Set<String> latestMembers;
public KubernetesLeadershipController(CamelContext camelContext, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration,
KubernetesClusterEventHandler eventHandler) {
this.camelContext = camelContext;
this.kubernetesClient = kubernetesClient;
this.lockConfiguration = lockConfiguration;
this.eventHandler = eventHandler;
}
@Override
public void start() {
if (serializedExecutor == null) {
LOG.debug("{} Starting leadership controller...", logPrefix());
serializedExecutor = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelKubernetesLeadershipController");
leaderNotifier = new TimedLeaderNotifier(this.camelContext, this.eventHandler);
leaderNotifier.start();
serializedExecutor.execute(this::refreshStatus);
}
}
@Override
public void stop() {
LOG.debug("{} Stopping leadership controller...", logPrefix());
if (serializedExecutor != null) {
serializedExecutor.shutdownNow();
}
serializedExecutor = null;
if (leaderNotifier != null) {
leaderNotifier.stop();
}
leaderNotifier = null;
}
private void refreshStatus() {
switch (currentState) {
case NOT_LEADER:
refreshStatusNotLeader();
break;
case BECOMING_LEADER:
refreshStatusBecomingLeader();
break;
case LEADER:
refreshStatusLeader();
break;
default:
throw new RuntimeException("Unsupported state " + currentState);
}
}
/**
* This pod is currently not leader. It should monitor the leader
* configuration and try to acquire the leadership if possible.
*/
private void refreshStatusNotLeader() {
LOG.debug("{} Pod is not leader, pulling new data from the cluster", logPrefix());
boolean pulled = lookupNewLeaderInfo();
if (!pulled) {
rescheduleAfterDelay();
return;
}
if (this.latestLeaderInfo.hasEmptyLeader()) {
// There is no previous leader
LOG.info("{} The cluster has no leaders. Trying to acquire the leadership...", logPrefix());
boolean acquired = tryAcquireLeadership();
if (acquired) {
LOG.info("{} Leadership acquired by current pod with immediate effect", logPrefix());
this.currentState = State.LEADER;
this.serializedExecutor.execute(this::refreshStatus);
return;
} else {
LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", logPrefix());
}
} else if (!this.latestLeaderInfo.hasValidLeader()) {
// There's a previous leader and it's invalid
LOG.info("{} Leadership has been lost by old owner. Trying to acquire the leadership...", logPrefix());
boolean acquired = tryAcquireLeadership();
if (acquired) {
LOG.info("{} Leadership acquired by current pod", logPrefix());
this.currentState = State.BECOMING_LEADER;
this.serializedExecutor.execute(this::refreshStatus);
return;
} else {
LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", logPrefix());
}
} else if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
// We are leaders for some reason (e.g. pod restart on failure)
LOG.info("{} Leadership is already owned by current pod", logPrefix());
this.currentState = State.BECOMING_LEADER;
this.serializedExecutor.execute(this::refreshStatus);
return;
}
this.leaderNotifier.refreshLeadership(Optional.ofNullable(this.latestLeaderInfo.getLeader()), System.currentTimeMillis(), this.lockConfiguration.getLeaseDurationMillis(),
this.latestLeaderInfo.getMembers());
rescheduleAfterDelay();
}
/**
* This pod has acquired the leadership but it should wait for the old
* leader to tear down resources before starting the local services.
*/
private void refreshStatusBecomingLeader() {
// Wait always the same amount of time before becoming the leader
// Even if the current pod is already leader, we should let a possible
// old version of the pod to shut down
long delay = this.lockConfiguration.getLeaseDurationMillis();
LOG.info("{} Current pod owns the leadership, but it will be effective in {} seconds...", logPrefix(),
new BigDecimal(delay).divide(BigDecimal.valueOf(1000), 2, BigDecimal.ROUND_HALF_UP));
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOG.warn("Thread interrupted", e);
}
LOG.info("{} Current pod is becoming the new leader now...", logPrefix());
this.currentState = State.LEADER;
this.serializedExecutor.execute(this::refreshStatus);
}
private void refreshStatusLeader() {
LOG.debug("{} Pod should be the leader, pulling new data from the cluster", logPrefix());
long timeBeforePulling = System.currentTimeMillis();
boolean pulled = lookupNewLeaderInfo();
if (!pulled) {
rescheduleAfterDelay();
return;
}
if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
LOG.debug("{} Current Pod is still the leader", logPrefix());
this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()), timeBeforePulling, this.lockConfiguration.getRenewDeadlineMillis(),
this.latestLeaderInfo.getMembers());
rescheduleAfterDelay();
return;
} else {
LOG.debug("{} Current Pod has lost the leadership", logPrefix());
this.currentState = State.NOT_LEADER;
// set a empty leader to signal leadership loss
this.leaderNotifier.refreshLeadership(Optional.empty(), System.currentTimeMillis(), lockConfiguration.getLeaseDurationMillis(), this.latestLeaderInfo.getMembers());
// restart from scratch to acquire leadership
this.serializedExecutor.execute(this::refreshStatus);
}
}
private void rescheduleAfterDelay() {
this.serializedExecutor.schedule(this::refreshStatus, jitter(this.lockConfiguration.getRetryPeriodMillis(), this.lockConfiguration.getJitterFactor()),
TimeUnit.MILLISECONDS);
}
private boolean lookupNewLeaderInfo() {
LOG.debug("{} Looking up leadership information...", logPrefix());
ConfigMap configMap;
try {
configMap = pullConfigMap();
} catch (Throwable e) {
LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes");
LOG.debug(logPrefix() + " Exception thrown during ConfigMap lookup", e);
return false;
}
Set<String> members;
try {
members = Objects.requireNonNull(pullClusterMembers(), "Retrieved a null set of members");
} catch (Throwable e) {
LOG.warn(logPrefix() + " Unable to retrieve the list of cluster members from Kubernetes");
LOG.debug(logPrefix() + " Exception thrown during Pod list lookup", e);
return false;
}
updateLatestLeaderInfo(configMap, members);
return true;
}
private boolean tryAcquireLeadership() {
LOG.debug("{} Trying to acquire the leadership...", logPrefix());
ConfigMap configMap = this.latestConfigMap;
Set<String> members = this.latestMembers;
LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
if (latestLeaderInfo == null || members == null) {
LOG.warn(logPrefix() + " Unexpected condition. Latest leader info or list of members is empty.");
return false;
} else if (!members.contains(this.lockConfiguration.getPodName())) {
LOG.warn(logPrefix() + " The list of cluster members " + latestLeaderInfo.getMembers() + " does not contain the current Pod. Cannot acquire" + " leadership.");
return false;
}
// Info we would set set in the configmap to become leaders
LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date(), members);
if (configMap == null) {
// No ConfigMap created so far
LOG.debug("{} Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created", logPrefix());
ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo);
try {
kubernetesClient.configMaps().inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)).create(newConfigMap);
LOG.debug("{} ConfigMap {} successfully created", logPrefix(), this.lockConfiguration.getConfigMapName());
updateLatestLeaderInfo(newConfigMap, members);
return true;
} catch (Exception ex) {
// Suppress exception
LOG.warn(logPrefix()
+ " Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has "
+ "the right " + "permissions to create it");
LOG.debug(logPrefix() + " Exception while trying to create the ConfigMap", ex);
return false;
}
} else {
LOG.debug("{} Lock configmap already present in the Kubernetes namespace. Checking...", logPrefix());
LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName());
boolean canAcquire = !leaderInfo.hasValidLeader();
if (canAcquire) {
// Try to be the new leader
try {
ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
kubernetesClient.configMaps().inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
.withName(this.lockConfiguration.getConfigMapName()).lockResourceVersion(configMap.getMetadata().getResourceVersion()).replace(updatedConfigMap);
LOG.debug("{} ConfigMap {} successfully updated", logPrefix(), this.lockConfiguration.getConfigMapName());
updateLatestLeaderInfo(updatedConfigMap, members);
return true;
} catch (Exception ex) {
LOG.warn(logPrefix() + " Unable to update the lock ConfigMap to set leadership information");
LOG.debug(logPrefix() + " Error received during configmap lock replace", ex);
return false;
}
} else {
// Another pod is the leader and it's still active
LOG.debug("{} Another Pod ({}) is the current leader and it is still active", logPrefix(), this.latestLeaderInfo.getLeader());
return false;
}
}
}
private void updateLatestLeaderInfo(ConfigMap configMap, Set<String> members) {
LOG.debug("{} Updating internal status about the current leader", logPrefix());
this.latestConfigMap = configMap;
this.latestMembers = members;
this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName());
LOG.debug("{} Current leader info: {}", logPrefix(), this.latestLeaderInfo);
}
private ConfigMap pullConfigMap() {
return kubernetesClient.configMaps().inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
.withName(this.lockConfiguration.getConfigMapName()).get();
}
private Set<String> pullClusterMembers() {
List<Pod> pods = kubernetesClient.pods().inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
.withLabels(this.lockConfiguration.getClusterLabels()).list().getItems();
return pods.stream().map(pod -> pod.getMetadata().getName()).collect(Collectors.toSet());
}
private long jitter(long num, double factor) {
return (long)(num * (1 + Math.random() * (factor - 1)));
}
private String logPrefix() {
return "Pod[" + this.lockConfiguration.getPodName() + "]";
}
}