blob: c9ac15dc221e99931d3366fa0467a83d5e5e5b35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multicluster
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
)
type Controller struct {
namespace string
configClusterID cluster.ID
configCluster *Cluster
configClusterSyncers []ComponentConstraint
queue controllers.Queue
secrets kclient.Client[*corev1.Secret]
configOverrides []func(*rest.Config)
cs *ClusterStore
meshWatcher mesh.Watcher
handlers []handler
}
func NewController(kubeclientset kube.Client, namespace string, clusterID cluster.ID,
meshWatcher mesh.Watcher, configOverrides ...func(*rest.Config),
) *Controller {
controller := &Controller{
namespace: namespace,
configClusterID: clusterID,
configCluster: &Cluster{Client: kubeclientset, ID: clusterID},
configOverrides: configOverrides,
meshWatcher: meshWatcher,
}
return controller
}
func (c *Controller) Run(stopCh <-chan struct{}) error {
// run handlers for the config cluster; do not store this *Cluster in the ClusterStore or give it a SyncTimeout
// this is done outside the goroutine, we should block other Run/startFuncs until this is registered
c.configClusterSyncers = c.handleAdd(c.configCluster)
return nil
}
func (c *Controller) HasSynced() bool {
if !c.queue.HasSynced() {
return false
}
// Check all config cluster components are synced
// c.ConfigClusterHandler.HasSynced does not work; config cluster is handle specially
if !kube.AllSynced(c.configClusterSyncers) {
return false
}
// Check all remote clusters are synced (or timed out)
return c.cs.HasSynced()
}
func (c *Controller) handleAdd(cluster *Cluster) []ComponentConstraint {
syncers := make([]ComponentConstraint, 0, len(c.handlers))
for _, handler := range c.handlers {
syncers = append(syncers, handler.clusterAdded(cluster))
}
return syncers
}
func (c *Controller) handleDelete(key cluster.ID) {
for _, handler := range c.handlers {
handler.clusterDeleted(key)
}
}
type handler interface {
clusterAdded(cluster *Cluster) ComponentConstraint
clusterUpdated(cluster *Cluster) ComponentConstraint
clusterDeleted(clusterID cluster.ID)
HasSynced() bool
}
type ComponentBuilder interface {
registerHandler(h handler)
}
func BuildMultiClusterComponent[T ComponentConstraint](c ComponentBuilder, constructor func(cluster *Cluster) T) *Component[T] {
comp := &Component[T]{
constructor: constructor,
clusters: make(map[cluster.ID]T),
}
c.registerHandler(comp)
return comp
}
func (c *Controller) registerHandler(h handler) {
// Intentionally no lock. The controller today requires that handlers are registered before execution and not in parallel.
c.handlers = append(c.handlers, h)
}