blob: 315b5f8814a0df9b72a2a7a49d95889e01f5ec2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kube
import (
"context"
"fmt"
"net/http"
"time"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/lazy"
"github.com/apache/dubbo-kubernetes/pkg/sleep"
"go.uber.org/atomic"
istioclient "istio.io/client-go/pkg/clientset/versioned"
kubeExtClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
kubeVersion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
)
var log = dubbolog.RegisterScope("kube", "kube client debugging")
type client struct {
extSet kubeExtClient.Interface
config *rest.Config
revision string
clientFactory *clientFactory
version lazy.Lazy[*kubeVersion.Info]
informerFactory informerfactory.InformerFactory
dynamic dynamic.Interface
kube kubernetes.Interface
mapper meta.ResettableRESTMapper
metadata metadata.Interface
http *http.Client
objectFilter kubetypes.DynamicObjectFilter
clusterID cluster.ID
informerWatchesPending *atomic.Int32
started atomic.Bool
dubbo istioclient.Interface
crdWatcher kubetypes.CrdWatcher
fastSync bool
}
type Client interface {
RESTConfig() *rest.Config
Ext() kubeExtClient.Interface
Kube() kubernetes.Interface
Dynamic() dynamic.Interface
Metadata() metadata.Interface
Informers() informerfactory.InformerFactory
Dubbo() istioclient.Interface
ObjectFilter() kubetypes.DynamicObjectFilter
ClusterID() cluster.ID
CrdWatcher() kubetypes.CrdWatcher
RunAndWait(stop <-chan struct{}) bool
WaitForCacheSync(name string, stop <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool
Shutdown()
}
type CLIClient interface {
Client
DynamicClientFor(gvk schema.GroupVersionKind, obj *unstructured.Unstructured, namespace string) (dynamic.ResourceInterface, error)
}
type ClientOption func(cliClient CLIClient) CLIClient
func NewCLIClient(clientCfg clientcmd.ClientConfig, opts ...ClientOption) (CLIClient, error) {
return newClientInternal(newClientFactory(clientCfg, false), opts...)
}
func NewClient(clientCfg clientcmd.ClientConfig, cluster cluster.ID) (Client, error) {
return newClientInternal(newClientFactory(clientCfg, false), WithCluster(cluster))
}
func newClientInternal(clientFactory *clientFactory, opts ...ClientOption) (*client, error) {
var c client
var err error
c.clientFactory = clientFactory
c.config, err = clientFactory.ToRESTConfig()
if err != nil {
return nil, err
}
for _, opt := range opts {
opt(&c)
}
c.mapper, err = clientFactory.mapper.Get()
if err != nil {
return nil, err
}
c.informerFactory = informerfactory.NewSharedInformerFactory()
c.kube, err = kubernetes.NewForConfig(c.config)
if err != nil {
return nil, err
}
c.metadata, err = metadata.NewForConfig(c.config)
if err != nil {
return nil, err
}
c.dynamic, err = dynamic.NewForConfig(c.config)
if err != nil {
return nil, err
}
c.dubbo, err = istioclient.NewForConfig(c.config)
if err != nil {
return nil, err
}
c.extSet, err = kubeExtClient.NewForConfig(c.config)
if err != nil {
return nil, err
}
c.http = &http.Client{}
if c.config != nil && c.config.Timeout != 0 {
c.http.Timeout = c.config.Timeout
} else {
c.http.Timeout = time.Second * 15
}
var clientWithTimeout kubernetes.Interface
clientWithTimeout = c.kube
restConfig := c.RESTConfig()
if restConfig != nil {
if restConfig.Timeout == 0 {
restConfig.Timeout = time.Second * 5
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err == nil {
clientWithTimeout = kubeClient
}
}
c.version = lazy.NewWithRetry(clientWithTimeout.Discovery().ServerVersion)
return &c, nil
}
func (c *client) RESTConfig() *rest.Config {
if c.config == nil {
return nil
}
cpy := *c.config
return &cpy
}
var (
_ Client = &client{}
_ CLIClient = &client{}
)
func EnableCrdWatcher(c Client) Client {
if NewCrdWatcher == nil {
panic("NewCrdWatcher is unset. Likely the crd watcher library is not imported anywhere")
}
if c.(*client).crdWatcher != nil {
panic("EnableCrdWatcher called twice for the same client")
}
c.(*client).crdWatcher = NewCrdWatcher(c)
return c
}
var NewCrdWatcher func(Client) kubetypes.CrdWatcher
func (c *client) Ext() kubeExtClient.Interface {
return c.extSet
}
func (c *client) Kube() kubernetes.Interface {
return c.kube
}
func (c *client) ClusterID() cluster.ID {
return c.clusterID
}
func (c *client) Dynamic() dynamic.Interface {
return c.dynamic
}
func (c *client) Metadata() metadata.Interface {
return c.metadata
}
func (c *client) Informers() informerfactory.InformerFactory {
return c.informerFactory
}
func (c *client) Dubbo() istioclient.Interface {
return c.dubbo
}
func (c *client) ObjectFilter() kubetypes.DynamicObjectFilter {
return c.objectFilter
}
func (c *client) CrdWatcher() kubetypes.CrdWatcher {
return c.crdWatcher
}
func (c *client) DynamicClientFor(gvk schema.GroupVersionKind, obj *unstructured.Unstructured, namespace string) (dynamic.ResourceInterface, error) {
gvr, namespaced := c.bestEffortToGVR(gvk, obj, namespace)
var dr dynamic.ResourceInterface
if namespaced {
ns := ""
if obj != nil {
ns = obj.GetNamespace()
}
if ns == "" {
ns = namespace
} else if namespace != "" && ns != namespace {
return nil, fmt.Errorf("object %v/%v provided namespace %q but apply called with %q", gvk, obj.GetName(), ns, namespace)
}
dr = c.dynamic.Resource(gvr).Namespace(ns)
} else {
dr = c.dynamic.Resource(gvr)
}
return dr, nil
}
func (c *client) WaitForCacheSync(name string, stop <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
if c.informerWatchesPending == nil {
return WaitForCacheSync(name, stop, cacheSyncs...)
}
syncFns := append(cacheSyncs, func() bool {
return c.informerWatchesPending.Load() == 0
})
return WaitForCacheSync(name, stop, syncFns...)
}
func (c *client) Run(stop <-chan struct{}) {
c.informerFactory.Start(stop)
if c.crdWatcher != nil {
go c.crdWatcher.Run(stop)
}
alreadyStarted := c.started.Swap(true)
if alreadyStarted {
log.Debugf("cluster %q kube client started again", c.clusterID)
} else {
log.Infof("cluster %q kube client started", c.clusterID)
}
}
func (c *client) RunAndWait(stop <-chan struct{}) bool {
c.Run(stop)
if c.fastSync {
if c.crdWatcher != nil {
if !c.WaitForCacheSync("crd watcher", stop, c.crdWatcher.HasSynced) {
return false
}
}
// WaitForCacheSync will virtually never be synced on the first call, as its called immediately after Start()
// This triggers a 100ms delay per call, which is often called 2-3 times in a test, delaying tests.
// Instead, we add an aggressive sync polling
if !fastWaitForCacheSync(stop, c.informerFactory) {
return false
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-stop
cancel()
}()
err := wait.PollUntilContextTimeout(ctx, time.Microsecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
select {
case <-stop:
return false, fmt.Errorf("channel closed")
case <-ctx.Done():
return false, ctx.Err()
default:
}
if c.informerWatchesPending.Load() == 0 {
return true, nil
}
return false, nil
})
return err == nil
}
if c.crdWatcher != nil {
if !c.WaitForCacheSync("crd watcher", stop, c.crdWatcher.HasSynced) {
return false
}
}
return c.informerFactory.WaitForCacheSync(stop)
}
func (c *client) bestEffortToGVR(gvk schema.GroupVersionKind, obj *unstructured.Unstructured, namespace string) (schema.GroupVersionResource, bool) {
if s, f := collections.All.FindByGroupVersionAliasesKind(config.FromKubernetesGVK(gvk)); f {
gvr := s.GroupVersionResource()
gvr.Version = gvk.Version
return gvr, !s.IsClusterScoped()
}
if c.mapper != nil {
mapping, err := c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err == nil {
return mapping.Resource, mapping.Scope.Name() == meta.RESTScopeNameNamespace
}
}
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
namespaced := (obj != nil && obj.GetNamespace() != "") || namespace != ""
return gvr, namespaced
}
func WaitForCacheSync(name string, stop <-chan struct{}, cacheSyncs ...cache.InformerSynced) (r bool) {
t0 := time.Now()
maximum := time.Millisecond * 100
delay := time.Millisecond
f := func() bool {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false
}
}
return true
}
attempt := 0
defer func() {
if r {
log.Infof("sync complete: name=%s, time=%v", name, time.Since(t0))
} else {
log.Infof("sync failed: name=%s, time=%v", name, time.Since(t0))
}
}()
for {
select {
case <-stop:
return false
default:
}
attempt++
res := f()
if res {
return true
}
delay *= 2
if delay > maximum {
delay = maximum
}
if attempt%50 == 0 {
// Log every 50th attempt (5s) at info, to avoid too much noisy
fmt.Printf("waiting for sync...: name=%s, time=%v\n", name, time.Since(t0))
}
if !sleep.Until(stop, delay) {
return false
}
}
}
func fastWaitForCacheSync(stop <-chan struct{}, informerFactory informerfactory.InformerFactory) bool {
returnImmediately := make(chan struct{})
close(returnImmediately)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-stop
cancel()
}()
err := wait.PollUntilContextTimeout(ctx, time.Microsecond*100, wait.ForeverTestTimeout, true, func(pollCtx context.Context) (bool, error) {
select {
case <-stop:
return false, fmt.Errorf("channel closed")
case <-pollCtx.Done():
return false, pollCtx.Err()
default:
}
return informerFactory.WaitForCacheSync(returnImmediately), nil
})
return err == nil
}
func (c *client) Shutdown() {
c.informerFactory.Shutdown()
}
func SetObjectFilter(c Client, filter kubetypes.DynamicObjectFilter) Client {
c.(*client).objectFilter = filter
return c
}
func WithCluster(id cluster.ID) ClientOption {
return func(c CLIClient) CLIClient {
client := c.(*client)
client.clusterID = id
return client
}
}
func WithRevision(revision string) ClientOption {
return func(cliClient CLIClient) CLIClient {
client := cliClient.(*client)
client.revision = revision
return client
}
}