blob: f01aabd5feabd668d77733ea1830ac12754905e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/clustertrustbundle"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/keycertbundle"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/server"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
)
type kubeController struct {
MeshServiceController *aggregate.Controller
*Controller
stop chan struct{}
}
func (k *kubeController) Close() {
close(k.stop)
}
type Multicluster struct {
// serverID of this planet instance used for leader election
serverID string
// options to use when creating kube controllers
opts Options
s server.Instance
clusterLocal model.ClusterLocalProvider
distributeCACert bool
caBundleWatcher *keycertbundle.Watcher
revision string
component *multicluster.Component[*kubeController]
}
func NewMulticluster(
serverID string,
opts Options,
caBundleWatcher *keycertbundle.Watcher,
revision string,
distributeCACert bool,
clusterLocal model.ClusterLocalProvider,
s server.Instance,
controller *multicluster.Controller,
) *Multicluster {
mc := &Multicluster{
serverID: serverID,
opts: opts,
distributeCACert: distributeCACert,
caBundleWatcher: caBundleWatcher,
revision: revision,
clusterLocal: clusterLocal,
s: s,
}
mc.component = multicluster.BuildMultiClusterComponent(controller, func(cluster *multicluster.Cluster) *kubeController {
stop := make(chan struct{})
client := cluster.Client
configCluster := opts.ClusterID == cluster.ID
options := opts
options.ClusterID = cluster.ID
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
options.ConfigCluster = configCluster
kubeRegistry := NewController(client, options)
kubeController := &kubeController{
MeshServiceController: opts.MeshServiceController,
Controller: kubeRegistry,
stop: stop,
}
mc.initializeCluster(cluster, kubeController, kubeRegistry, options, configCluster, stop)
return kubeController
})
return mc
}
func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeController *kubeController, kubeRegistry *Controller,
options Options, configCluster bool, clusterStopCh <-chan struct{},
) {
client := cluster.Client
// run after WorkloadHandler is added
m.opts.MeshServiceController.AddRegistryAndRun(kubeRegistry, clusterStopCh)
go func() {
var shouldLead bool
if !configCluster {
shouldLead = m.checkShouldLead()
log.Infof("should join leader-election for cluster %s: %t", cluster.ID, shouldLead)
}
if m.distributeCACert && (shouldLead || configCluster) {
if features.EnableClusterTrustBundles {
// Block server exit on graceful termination of the leader controller.
m.s.RunComponentAsyncAndWait("clustertrustbundle controller", func(_ <-chan struct{}) error {
election := leaderelection.
NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, !configCluster, client)
// For config cluster (single-node deployment), disable leader election even if globally enabled
// because there's only one instance and no need for election.
if configCluster {
election.SetEnabled(false)
}
election.AddRunFunction(func(leaderStop <-chan struct{}) {
log.Infof("starting clustertrustbundle controller for cluster %s", cluster.ID)
c := clustertrustbundle.NewController(client, m.caBundleWatcher)
client.RunAndWait(clusterStopCh)
c.Run(leaderStop)
})
election.Run(clusterStopCh)
return nil
})
} else {
// Block server exit on graceful termination of the leader controller.
m.s.RunComponentAsyncAndWait("namespace controller", func(_ <-chan struct{}) error {
election := leaderelection.
NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, !configCluster, client)
// For config cluster (single-node deployment), disable leader election even if globally enabled
// because there's only one instance and no need for election.
if configCluster {
election.SetEnabled(false)
}
election.AddRunFunction(func(leaderStop <-chan struct{}) {
log.Infof("starting namespace controller for cluster %s", cluster.ID)
nc := NewNamespaceController(client, m.caBundleWatcher)
// Start informers again. This fixes the case where informers for namespace do not start,
// as we create them only after acquiring the leader lock
// Note: stop here should be the overall planet stop, NOT the leader election stop. We are
// basically lazy loading the informer, if we stop it when we lose the lock we will never
// recreate it again.
client.RunAndWait(clusterStopCh)
nc.Run(leaderStop)
})
election.Run(clusterStopCh)
return nil
})
}
}
}()
}
func (m *Multicluster) checkShouldLead() bool {
var res bool
return res
}