blob: 5673fb80f5cf23745ebe1f1a930161bb757ce744 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package k8s
import (
"context"
"time"
)
import (
"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
kube_ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
kube_manager "sigs.k8s.io/controller-runtime/pkg/manager"
kube_metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
kube_webhook "sigs.k8s.io/controller-runtime/pkg/webhook"
)
import (
config_core "github.com/apache/dubbo-kubernetes/pkg/config/core"
"github.com/apache/dubbo-kubernetes/pkg/core"
core_plugins "github.com/apache/dubbo-kubernetes/pkg/core/plugins"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
k8s_common "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s"
k8s_extensions "github.com/apache/dubbo-kubernetes/pkg/plugins/extensions/k8s"
"github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s"
"github.com/apache/dubbo-kubernetes/pkg/util/pointer"
)
var _ core_plugins.BootstrapPlugin = &plugin{}
var log = core.Log.WithName("plugins").WithName("bootstrap").WithName("k8s")
type plugin struct{}
func init() {
core_plugins.Register(core_plugins.Kubernetes, &plugin{})
}
func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, cfg core_plugins.PluginConfig) error {
// 半托管模式和纯k8s模式都可以使用这一个插件
if b.Config().DeployMode == config_core.UniversalMode {
return nil
}
scheme, err := NewScheme()
if err != nil {
return err
}
restClientConfig := kube_ctrl.GetConfigOrDie()
restClientConfig.QPS = float32(b.Config().Runtime.Kubernetes.ClientConfig.Qps)
restClientConfig.Burst = b.Config().Runtime.Kubernetes.ClientConfig.BurstQps
systemNamespace := b.Config().Store.Kubernetes.SystemNamespace
mgr, err := kube_ctrl.NewManager(
restClientConfig,
kube_ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
DefaultUnsafeDisableDeepCopy: pointer.To(true),
},
// Admission WebHook Server
WebhookServer: kube_webhook.NewServer(kube_webhook.Options{
Host: b.Config().Runtime.Kubernetes.AdmissionServer.Address,
Port: int(b.Config().Runtime.Kubernetes.AdmissionServer.Port),
CertDir: b.Config().Runtime.Kubernetes.AdmissionServer.CertDir,
}),
LeaderElection: true,
LeaderElectionID: "cp-leader-lease",
LeaderElectionNamespace: systemNamespace,
Logger: core.Log.WithName("kube-manager"),
LeaseDuration: &b.Config().Runtime.Kubernetes.LeaderElection.LeaseDuration.Duration,
RenewDeadline: &b.Config().Runtime.Kubernetes.LeaderElection.RenewDeadline.Duration,
// Disable metrics bind address as we use kube metrics registry directly.
Metrics: kube_metricsserver.Options{
BindAddress: "0",
},
},
)
if err != nil {
return err
}
secretClient, err := createSecretClient(b.AppCtx(), scheme, systemNamespace, restClientConfig, mgr.GetRESTMapper())
if err != nil {
return err
}
b.WithExtensions(k8s_extensions.NewManagerContext(b.Extensions(), mgr))
b.WithComponentManager(&kubeComponentManager{Manager: mgr})
b.WithExtensions(k8s_extensions.NewSecretClientContext(b.Extensions(), secretClient))
if expTime := b.Config().Runtime.Kubernetes.MarshalingCacheExpirationTime.Duration; expTime > 0 {
b.WithExtensions(k8s_extensions.NewResourceConverterContext(b.Extensions(), k8s.NewCachingConverter(expTime)))
} else {
b.WithExtensions(k8s_extensions.NewResourceConverterContext(b.Extensions(), k8s.NewSimpleConverter()))
}
b.WithExtensions(k8s_extensions.NewCompositeValidatorContext(b.Extensions(), &k8s_common.CompositeValidator{}))
return nil
}
// We need separate client for Secrets, because we don't have (get/list/watch) RBAC for all namespaces / cluster scope.
// Kubernetes cache lists resources under the hood from all Namespace unless we specify the "Namespace" in Options.
// If we try to use regular cached client for Secrets then we will see following error: E1126 10:42:52.097662 1 reflector.go:178] pkg/mod/k8s.io/client-go@v0.18.9/tools/cache/reflector.go:125: Failed to list *v1.Secret: secrets is forbidden: User "system:serviceaccount:dubbo-system:dubbo-control-plane" cannot list resource "secrets" in API group "" at the cluster scope
// We cannot specify this Namespace parameter for the main cache in ControllerManager because it affect all the resources, therefore we need separate client with cache for Secrets.
// The alternative was to use non-cached client, but it had performance problems.
func createSecretClient(appCtx context.Context, scheme *kube_runtime.Scheme, systemNamespace string, config *rest.Config, restMapper meta.RESTMapper) (kube_client.Client, error) {
resyncPeriod := 10 * time.Hour // default resyncPeriod in Kubernetes
kubeCache, err := cache.New(config, cache.Options{
Scheme: scheme,
Mapper: restMapper,
SyncPeriod: &resyncPeriod,
DefaultNamespaces: map[string]cache.Config{systemNamespace: {}},
})
if err != nil {
return nil, err
}
// We are listing secrets by our custom "type", therefore we need to add index by this field into cache
err = kubeCache.IndexField(appCtx, &kube_core.Secret{}, "type", func(object kube_client.Object) []string {
secret := object.(*kube_core.Secret)
return []string{string(secret.Type)}
})
if err != nil {
return nil, errors.Wrap(err, "could not add index of Secret cache by field 'type'")
}
// According to ControllerManager code, cache needs to start before all the Runnables (our Components)
// So we need separate go routine to start a cache and then wait for cache
go func() {
if err := kubeCache.Start(appCtx); err != nil {
// According to implementations, there is no case when error is returned. It just for the Runnable contract.
log.Error(err, "could not start the secret k8s cache")
}
}()
if ok := kubeCache.WaitForCacheSync(appCtx); !ok {
// ControllerManager ignores case when WaitForCacheSync returns false.
// It might be a better idea to return an error and stop the Control Plane altogether, but sticking to return error for now.
core.Log.Error(errors.New("could not sync secret cache"), "failed to wait for cache")
}
return kube_client.New(config, kube_client.Options{
Scheme: scheme,
Mapper: restMapper,
Cache: &kube_client.CacheOptions{
Reader: kubeCache,
},
})
}
func (p *plugin) AfterBootstrap(b *core_runtime.Builder, _ core_plugins.PluginConfig) error {
if b.Config().DeployMode != config_core.KubernetesMode {
return nil
}
return nil
}
func (p *plugin) Name() core_plugins.PluginName {
return core_plugins.Kubernetes
}
func (p *plugin) Order() int {
return core_plugins.EnvironmentPreparingOrder
}
type kubeComponentManager struct {
kube_ctrl.Manager
gracefulComponents []component.GracefulComponent
}
var _ component.Manager = &kubeComponentManager{}
func (cm *kubeComponentManager) Start(done <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
<-done
}()
defer cm.waitForDone()
if err := cm.Manager.Start(ctx); err != nil {
return errors.Wrap(err, "error running Kubernetes Manager")
}
return nil
}
// Extra check that component.Component implements LeaderElectionRunnable so the leader election works so we won't break leader election on K8S when refactoring component.Component
var _ kube_manager.LeaderElectionRunnable = component.ComponentFunc(func(i <-chan struct{}) error {
return nil
})
func (k *kubeComponentManager) Add(components ...component.Component) error {
for _, c := range components {
if gc, ok := c.(component.GracefulComponent); ok {
k.gracefulComponents = append(k.gracefulComponents, gc)
}
if err := k.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
return err
}
}
return nil
}
func (k *kubeComponentManager) waitForDone() {
for _, gc := range k.gracefulComponents {
gc.WaitForDone()
}
}
// This adaptor is required unless component.Component takes a context as input
type componentRunnableAdaptor struct {
component.Component
}
func (c componentRunnableAdaptor) Start(ctx context.Context) error {
return c.Component.Start(ctx.Done())
}
func (c componentRunnableAdaptor) NeedLeaderElection() bool {
return c.Component.NeedLeaderElection()
}
var (
_ kube_manager.LeaderElectionRunnable = &componentRunnableAdaptor{}
_ kube_manager.Runnable = &componentRunnableAdaptor{}
)