blob: 7ace336eea011577793677df984d167feb405bf5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kubernetes.cluster;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.CamelContext;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
import org.apache.camel.component.kubernetes.KubernetesHelper;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEvent;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeadershipController;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLockConfiguration;
import org.apache.camel.impl.cluster.AbstractCamelClusterView;
import org.apache.camel.util.ObjectHelper;
/**
* The cluster view on a specific Camel cluster namespace (not to be confused with Kubernetes namespaces).
* Namespaces are represented as keys in a Kubernetes ConfigMap (values are the current leader pods).
*/
public class KubernetesClusterView extends AbstractCamelClusterView {
private CamelContext camelContext;
private KubernetesClient kubernetesClient;
private KubernetesConfiguration configuration;
private KubernetesLockConfiguration lockConfiguration;
private KubernetesClusterMember localMember;
private Map<String, KubernetesClusterMember> memberCache;
private volatile Optional<CamelClusterMember> currentLeader = Optional.empty();
private volatile List<CamelClusterMember> currentMembers = Collections.emptyList();
private KubernetesLeadershipController controller;
public KubernetesClusterView(CamelContext camelContext, KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) {
super(cluster, lockConfiguration.getGroupName());
this.camelContext = ObjectHelper.notNull(camelContext, "camelContext");
this.configuration = ObjectHelper.notNull(configuration, "configuration");
this.lockConfiguration = ObjectHelper.notNull(lockConfiguration, "lockConfiguration");
this.localMember = new KubernetesClusterMember(lockConfiguration.getPodName());
this.memberCache = new HashMap<>();
}
@Override
public Optional<CamelClusterMember> getLeader() {
return currentLeader;
}
@Override
public CamelClusterMember getLocalMember() {
return localMember;
}
@Override
public List<CamelClusterMember> getMembers() {
return currentMembers;
}
@Override
protected void doStart() throws Exception {
if (controller == null) {
this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration);
controller = new KubernetesLeadershipController(camelContext, kubernetesClient, this.lockConfiguration, event -> {
if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
// New leader
Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();
currentLeader = leader.map(this::toMember);
fireLeadershipChangedEvent(currentLeader);
} else if (event instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
Set<String> members = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(event).getData();
Set<String> oldMembers = currentMembers.stream().map(CamelClusterMember::getId).collect(Collectors.toSet());
currentMembers = members.stream().map(this::toMember).collect(Collectors.toList());
// Computing differences
Set<String> added = new HashSet<>(members);
added.removeAll(oldMembers);
Set<String> removed = new HashSet<>(oldMembers);
removed.removeAll(members);
for (String id : added) {
fireMemberAddedEvent(toMember(id));
}
for (String id : removed) {
fireMemberRemovedEvent(toMember(id));
}
}
});
controller.start();
}
}
@Override
protected void doStop() throws Exception {
if (controller != null) {
controller.stop();
controller = null;
kubernetesClient.close();
kubernetesClient = null;
}
}
protected KubernetesClusterMember toMember(String name) {
if (name.equals(localMember.getId())) {
return localMember;
}
return memberCache.computeIfAbsent(name, KubernetesClusterMember::new);
}
class KubernetesClusterMember implements CamelClusterMember {
private String podName;
public KubernetesClusterMember(String podName) {
this.podName = ObjectHelper.notNull(podName, "podName");
}
@Override
public boolean isLeader() {
return currentLeader.isPresent() && currentLeader.get().getId().equals(podName);
}
@Override
public boolean isLocal() {
return ObjectHelper.equal(lockConfiguration.getPodName(), podName);
}
@Override
public String getId() {
return podName;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("KubernetesClusterMember{");
sb.append("podName='").append(podName).append('\'');
sb.append('}');
return sb.toString();
}
}
}