blob: ea96aff91b32358d4bb7a39f1b6a05bcf3a67442 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package client
import (
clientset ""
kubeExtClient ""
dubbo_cp ""
type KubeClient struct {
kubernetesClientSet kubernetes.Interface
kubeInformer informers.SharedInformerFactory
kubeConfig *rest.Config
dubboClientSet clientset.Interface
dubboInformer externalversions.SharedInformerFactory
extSet kubeExtClient.Interface
// only for test
fastSync bool
informerWatchesPending *atomic.Int32
func NewKubeClient() *KubeClient {
return &KubeClient{}
func (c *KubeClient) DubboInformer() externalversions.SharedInformerFactory {
return c.dubboInformer
func (c *KubeClient) Ext() kubeExtClient.Interface {
return c.extSet
func (k *KubeClient) DubboClientSet() clientset.Interface {
return k.dubboClientSet
func (k *KubeClient) GetKubeConfig() *rest.Config {
return k.kubeConfig
func (k *KubeClient) GetKubernetesClientSet() kubernetes.Interface {
return k.kubernetesClientSet
// nolint
func (k *KubeClient) Start(stop <-chan struct{}) error {
if k.fastSync {
// WaitForCacheSync will virtually never be synced on the first call, as its called immediately after Start()
// This triggers a 100ms delay per call, which is often called 2-3 times in a test, delaying tests.
// Instead, we add an aggressive sync polling
_ = wait.PollImmediate(time.Microsecond, wait.ForeverTestTimeout, func() (bool, error) {
if k.informerWatchesPending.Load() == 0 {
return true, nil
return false, nil
} else {
return nil
func (k *KubeClient) NeedLeaderElection() bool {
return false
func (k *KubeClient) Init(options *dubbo_cp.Config) bool {
config, err := rest.InClusterConfig()
options.KubeConfig.InPodEnv = err == nil
kubeconfig := options.KubeConfig.KubeFileConfig
if err != nil {
logger.Sugar().Infof("Failed to load config from Pod. Will fall back to kube config file.")
// Read kubeconfig from command line
if len(kubeconfig) <= 0 {
// Read kubeconfig from env
kubeconfig = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
if len(kubeconfig) <= 0 {
// Read kubeconfig from home dir
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
// use the current context in kubeconfig
logger.Sugar().Infof("Read kubeconfig from %s", kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logger.Sugar().Warnf("Failed to load config from kube config file.")
return false
// set qps and burst for rest config
config.QPS = float32(options.KubeConfig.RestConfigQps)
config.Burst = options.KubeConfig.RestConfigBurst
k.kubeConfig = config
// creates the client
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error())
return false
if err != nil {
logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error())
return false
k.kubernetesClientSet = clientSet
genClient, err := clientset.NewForConfig(config)
if err != nil {
logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error())
return false
factory := externalversions.NewSharedInformerFactory(genClient, 0)
k.dubboInformer = factory
k.dubboClientSet = genClient
ext, err := kubeExtClient.NewForConfig(config)
if err != nil {
logger.Sugar().Warnf("Failed to create kubeExtClient to kubernetes. " + err.Error())
return false
k.extSet = ext
return true
type reflectInformerSync interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// Wait for cache sync immediately, rather than with 100ms delay which slows tests
// See
// nolint
func fastWaitForCacheSync(informerFactory reflectInformerSync) {
returnImmediately := make(chan struct{})
_ = wait.PollImmediate(time.Microsecond, wait.ForeverTestTimeout, func() (bool, error) {
for _, synced := range informerFactory.WaitForCacheSync(returnImmediately) {
if !synced {
return false, nil
return true, nil