| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package kube |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "os" |
| "reflect" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/hashicorp/go-multierror" |
| "go.uber.org/atomic" |
| "golang.org/x/sync/errgroup" |
| "google.golang.org/grpc/credentials" |
| "istio.io/api/label" |
| clientextensions "istio.io/client-go/pkg/apis/extensions/v1alpha1" |
| clientnetworkingalpha "istio.io/client-go/pkg/apis/networking/v1alpha3" |
| clientnetworkingbeta "istio.io/client-go/pkg/apis/networking/v1beta1" |
| clientsecurity "istio.io/client-go/pkg/apis/security/v1beta1" |
| clienttelemetry "istio.io/client-go/pkg/apis/telemetry/v1alpha1" |
| istioclient "istio.io/client-go/pkg/clientset/versioned" |
| istiofake "istio.io/client-go/pkg/clientset/versioned/fake" |
| istioinformer "istio.io/client-go/pkg/informers/externalversions" |
| "istio.io/pkg/version" |
| v1 "k8s.io/api/core/v1" |
| apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" |
| kubeExtClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" |
| extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" |
| kubeExtInformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| kubeVersion "k8s.io/apimachinery/pkg/version" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/cli-runtime/pkg/genericclioptions" |
| "k8s.io/cli-runtime/pkg/printers" |
| "k8s.io/cli-runtime/pkg/resource" |
| "k8s.io/client-go/discovery" |
| "k8s.io/client-go/dynamic" |
| "k8s.io/client-go/dynamic/dynamicinformer" |
| dynamicfake "k8s.io/client-go/dynamic/fake" |
| "k8s.io/client-go/informers" |
| "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/kubernetes/fake" |
| kubescheme "k8s.io/client-go/kubernetes/scheme" |
| "k8s.io/client-go/metadata" |
| metadatafake "k8s.io/client-go/metadata/fake" |
| "k8s.io/client-go/metadata/metadatainformer" |
| "k8s.io/client-go/rest" |
| clienttesting "k8s.io/client-go/testing" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/tools/clientcmd" |
| "k8s.io/client-go/tools/remotecommand" |
| "k8s.io/kubectl/pkg/cmd/apply" |
| kubectlDelete "k8s.io/kubectl/pkg/cmd/delete" |
| "k8s.io/kubectl/pkg/cmd/util" |
| gatewayapi "sigs.k8s.io/gateway-api/apis/v1alpha2" |
| gatewayapiclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" |
| gatewayapifake "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/fake" |
| gatewayapiinformer "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/operator/pkg/apis" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| "github.com/apache/dubbo-go-pixiu/pkg/kube/mcs" |
| "github.com/apache/dubbo-go-pixiu/pkg/queue" |
| "github.com/apache/dubbo-go-pixiu/pkg/test/util/yml" |
| ) |
| |
| const ( |
| defaultLocalAddress = "localhost" |
| fieldManager = "istio-kube-client" |
| ) |
| |
| // Client is a helper for common Kubernetes client operations. This contains various different kubernetes |
| // clients using a shared config. It is expected that all of Istiod can share the same set of clients and |
| // informers. Sharing informers is especially important for load on the API server/Istiod itself. |
| type Client interface { |
| // TODO: stop embedding this, it will conflict with future additions. Use Kube() instead is preferred |
| kubernetes.Interface |
| // RESTConfig returns the Kubernetes rest.Config used to configure the clients. |
| RESTConfig() *rest.Config |
| |
| // Ext returns the API extensions client. |
| Ext() kubeExtClient.Interface |
| |
| // Kube returns the core kube client |
| Kube() kubernetes.Interface |
| |
| // Dynamic client. |
| Dynamic() dynamic.Interface |
| |
| // Metadata returns the Metadata kube client. |
| Metadata() metadata.Interface |
| |
| // Istio returns the Istio kube client. |
| Istio() istioclient.Interface |
| |
| // GatewayAPI returns the gateway-api kube client. |
| GatewayAPI() gatewayapiclient.Interface |
| |
| // KubeInformer returns an informer for core kube client |
| KubeInformer() informers.SharedInformerFactory |
| |
| // DynamicInformer returns an informer for dynamic client |
| DynamicInformer() dynamicinformer.DynamicSharedInformerFactory |
| |
| // MetadataInformer returns an informer for metadata client |
| MetadataInformer() metadatainformer.SharedInformerFactory |
| |
| // IstioInformer returns an informer for the istio client |
| IstioInformer() istioinformer.SharedInformerFactory |
| |
| // GatewayAPIInformer returns an informer for the gateway-api client |
| GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory |
| |
| // ExtInformer returns an informer for the extension client |
| ExtInformer() kubeExtInformers.SharedInformerFactory |
| |
| // RunAndWait starts all informers and waits for their caches to sync. |
| // Warning: this must be called AFTER .Informer() is called, which will register the informer. |
| RunAndWait(stop <-chan struct{}) |
| |
| // GetKubernetesVersion returns the Kubernetes server version |
| GetKubernetesVersion() (*kubeVersion.Info, error) |
| } |
| |
| // ExtendedClient is an extended client with additional helpers/functionality for Istioctl and testing. |
| type ExtendedClient interface { |
| Client |
| // Revision of the Istio control plane. |
| Revision() string |
| |
| // EnvoyDo makes an http request to the Envoy in the specified pod. |
| EnvoyDo(ctx context.Context, podName, podNamespace, method, path string) ([]byte, error) |
| |
| // EnvoyDoWithPort makes an http request to the Envoy in the specified pod and port. |
| EnvoyDoWithPort(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error) |
| |
| // AllDiscoveryDo makes an http request to each Istio discovery instance. |
| AllDiscoveryDo(ctx context.Context, namespace, path string) (map[string][]byte, error) |
| |
| // GetIstioVersions gets the version for each Istio control plane component. |
| GetIstioVersions(ctx context.Context, namespace string) (*version.MeshInfo, error) |
| |
| // PodsForSelector finds pods matching selector. |
| PodsForSelector(ctx context.Context, namespace string, labelSelectors ...string) (*v1.PodList, error) |
| |
| // GetIstioPods retrieves the pod objects for Istio deployments |
| GetIstioPods(ctx context.Context, namespace string, params map[string]string) ([]v1.Pod, error) |
| |
| // PodExecCommands takes a list of commands and the pod data to run the commands in the specified pod. |
| PodExecCommands(podName, podNamespace, container string, commands []string) (stdout string, stderr string, err error) |
| |
| // PodExec takes a command and the pod data to run the command in the specified pod. |
| PodExec(podName, podNamespace, container string, command string) (stdout string, stderr string, err error) |
| |
| // PodLogs retrieves the logs for the given pod. |
| PodLogs(ctx context.Context, podName string, podNamespace string, container string, previousLog bool) (string, error) |
| |
| // NewPortForwarder creates a new PortForwarder configured for the given pod. If localPort=0, a port will be |
| // dynamically selected. If localAddress is empty, "localhost" is used. |
| NewPortForwarder(podName string, ns string, localAddress string, localPort int, podPort int) (PortForwarder, error) |
| |
| // ApplyYAMLFiles applies the resources in the given YAML files. |
| ApplyYAMLFiles(namespace string, yamlFiles ...string) error |
| |
| // ApplyYAMLFilesDryRun performs a dry run for applying the resource in the given YAML files |
| ApplyYAMLFilesDryRun(namespace string, yamlFiles ...string) error |
| |
| // DeleteYAMLFiles deletes the resources in the given YAML files. |
| DeleteYAMLFiles(namespace string, yamlFiles ...string) error |
| |
| // DeleteYAMLFilesDryRun performs a dry run for deleting the resources in the given YAML files. |
| DeleteYAMLFilesDryRun(namespace string, yamlFiles ...string) error |
| |
| // CreatePerRPCCredentials creates a gRPC bearer token provider that can create (and renew!) Istio tokens |
| CreatePerRPCCredentials(ctx context.Context, tokenNamespace, tokenServiceAccount string, audiences []string, |
| expirationSeconds int64) (credentials.PerRPCCredentials, error) |
| |
| // UtilFactory returns a kubectl factory |
| UtilFactory() util.Factory |
| } |
| |
| var ( |
| _ Client = &client{} |
| _ ExtendedClient = &client{} |
| ) |
| |
| const resyncInterval = 0 |
| |
| // NewFakeClient creates a new, fake, client |
| func NewFakeClient(objects ...runtime.Object) ExtendedClient { |
| c := &client{ |
| informerWatchesPending: atomic.NewInt32(0), |
| } |
| c.Interface = fake.NewSimpleClientset(objects...) |
| c.kube = c.Interface |
| c.kubeInformer = informers.NewSharedInformerFactory(c.Interface, resyncInterval) |
| s := FakeIstioScheme |
| |
| c.metadata = metadatafake.NewSimpleMetadataClient(s) |
| c.metadataInformer = metadatainformer.NewSharedInformerFactory(c.metadata, resyncInterval) |
| // Support some galley tests using basicmetadata |
| // If you are adding something to this list, consider other options like adding to the scheme. |
| gvrToListKind := map[schema.GroupVersionResource]string{ |
| {Group: "testdata.istio.io", Version: "v1alpha1", Resource: "Kind1s"}: "Kind1List", |
| } |
| c.dynamic = dynamicfake.NewSimpleDynamicClientWithCustomListKinds(s, gvrToListKind) |
| c.dynamicInformer = dynamicinformer.NewDynamicSharedInformerFactory(c.dynamic, resyncInterval) |
| |
| c.istio = istiofake.NewSimpleClientset() |
| c.istioInformer = istioinformer.NewSharedInformerFactoryWithOptions(c.istio, resyncInterval) |
| |
| c.gatewayapi = gatewayapifake.NewSimpleClientset() |
| c.gatewayapiInformer = gatewayapiinformer.NewSharedInformerFactory(c.gatewayapi, resyncInterval) |
| |
| c.extSet = extfake.NewSimpleClientset() |
| c.extInformer = kubeExtInformers.NewSharedInformerFactory(c.extSet, resyncInterval) |
| |
| // https://github.com/kubernetes/kubernetes/issues/95372 |
| // There is a race condition in the client fakes, where events that happen between the List and Watch |
| // of an informer are dropped. To avoid this, we explicitly manage the list and watch, ensuring all lists |
| // have an associated watch before continuing. |
| // This would likely break any direct calls to List(), but for now our tests don't do that anyways. If we need |
| // to in the future we will need to identify the Lists that have a corresponding Watch, possibly by looking |
| // at created Informers |
| // an atomic.Int is used instead of sync.WaitGroup because wg.Add and wg.Wait cannot be called concurrently |
| listReactor := func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { |
| c.informerWatchesPending.Inc() |
| return false, nil, nil |
| } |
| watchReactor := func(tracker clienttesting.ObjectTracker) func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { |
| return func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { |
| gvr := action.GetResource() |
| ns := action.GetNamespace() |
| watch, err := tracker.Watch(gvr, ns) |
| if err != nil { |
| return false, nil, err |
| } |
| c.informerWatchesPending.Dec() |
| return true, watch, nil |
| } |
| } |
| for _, fc := range []fakeClient{ |
| c.kube.(*fake.Clientset), |
| c.istio.(*istiofake.Clientset), |
| c.gatewayapi.(*gatewayapifake.Clientset), |
| c.dynamic.(*dynamicfake.FakeDynamicClient), |
| // TODO: send PR to client-go to add Tracker() |
| // c.metadata.(*metadatafake.FakeMetadataClient), |
| } { |
| fc.PrependReactor("list", "*", listReactor) |
| fc.PrependWatchReactor("*", watchReactor(fc.Tracker())) |
| } |
| |
| // discoveryv1/EndpontSlices readable from discoveryv1beta1/EndpointSlices |
| c.mirrorQueue = queue.NewQueue(1 * time.Second) |
| mirrorResource( |
| c.mirrorQueue, |
| c.kubeInformer.Discovery().V1().EndpointSlices().Informer(), |
| c.kube.DiscoveryV1beta1().EndpointSlices, |
| endpointSliceV1toV1beta1, |
| ) |
| |
| c.fastSync = true |
| |
| return c |
| } |
| |
| func NewFakeClientWithVersion(minor string, objects ...runtime.Object) ExtendedClient { |
| c := NewFakeClient(objects...).(*client) |
| if minor != "" && minor != "latest" { |
| c.versionOnce.Do(func() { |
| c.version = &kubeVersion.Info{Major: "1", Minor: minor, GitVersion: fmt.Sprintf("v1.%v.0", minor)} |
| }) |
| } |
| return c |
| } |
| |
| type fakeClient interface { |
| PrependReactor(verb, resource string, reaction clienttesting.ReactionFunc) |
| PrependWatchReactor(resource string, reaction clienttesting.WatchReactionFunc) |
| Tracker() clienttesting.ObjectTracker |
| } |
| |
| // Client is a helper wrapper around the Kube RESTClient for istioctl -> Pilot/Envoy/Mesh related things |
| type client struct { |
| kubernetes.Interface |
| |
| clientFactory util.Factory |
| config *rest.Config |
| |
| extSet kubeExtClient.Interface |
| extInformer kubeExtInformers.SharedInformerFactory |
| |
| kube kubernetes.Interface |
| kubeInformer informers.SharedInformerFactory |
| |
| dynamic dynamic.Interface |
| dynamicInformer dynamicinformer.DynamicSharedInformerFactory |
| |
| metadata metadata.Interface |
| metadataInformer metadatainformer.SharedInformerFactory |
| |
| istio istioclient.Interface |
| istioInformer istioinformer.SharedInformerFactory |
| |
| gatewayapi gatewayapiclient.Interface |
| gatewayapiInformer gatewayapiinformer.SharedInformerFactory |
| |
| // If enable, will wait for cache syncs with extremely short delay. This should be used only for tests |
| fastSync bool |
| informerWatchesPending *atomic.Int32 |
| |
| mirrorQueue queue.Instance |
| mirrorQueueStarted atomic.Bool |
| |
| // These may be set only when creating an extended client. |
| revision string |
| restClient *rest.RESTClient |
| discoveryClient discovery.CachedDiscoveryInterface |
| mapper meta.RESTMapper |
| |
| versionOnce sync.Once |
| version *kubeVersion.Info |
| } |
| |
| // newClientInternal creates a Kubernetes client from the given factory. |
| func newClientInternal(clientFactory util.Factory, revision string) (*client, error) { |
| var c client |
| var err error |
| |
| c.clientFactory = clientFactory |
| |
| c.config, err = clientFactory.ToRESTConfig() |
| if err != nil { |
| return nil, err |
| } |
| |
| c.revision = revision |
| |
| c.restClient, err = clientFactory.RESTClient() |
| if err != nil { |
| return nil, err |
| } |
| |
| c.discoveryClient, err = clientFactory.ToDiscoveryClient() |
| if err != nil { |
| return nil, err |
| } |
| c.mapper, err = clientFactory.ToRESTMapper() |
| if err != nil { |
| return nil, err |
| } |
| |
| c.Interface, err = kubernetes.NewForConfig(c.config) |
| c.kube = c.Interface |
| if err != nil { |
| return nil, err |
| } |
| c.kubeInformer = informers.NewSharedInformerFactory(c.Interface, resyncInterval) |
| |
| c.metadata, err = metadata.NewForConfig(c.config) |
| if err != nil { |
| return nil, err |
| } |
| c.metadataInformer = metadatainformer.NewSharedInformerFactory(c.metadata, resyncInterval) |
| |
| c.dynamic, err = dynamic.NewForConfig(c.config) |
| if err != nil { |
| return nil, err |
| } |
| c.dynamicInformer = dynamicinformer.NewDynamicSharedInformerFactory(c.dynamic, resyncInterval) |
| |
| c.istio, err = istioclient.NewForConfig(c.config) |
| if err != nil { |
| return nil, err |
| } |
| c.istioInformer = istioinformer.NewSharedInformerFactory(c.istio, resyncInterval) |
| |
| c.gatewayapi, err = gatewayapiclient.NewForConfig(c.config) |
| if err != nil { |
| return nil, err |
| } |
| c.gatewayapiInformer = gatewayapiinformer.NewSharedInformerFactory(c.gatewayapi, resyncInterval) |
| |
| c.extSet, err = kubeExtClient.NewForConfig(c.config) |
| if err != nil { |
| return nil, err |
| } |
| c.extInformer = kubeExtInformers.NewSharedInformerFactory(c.extSet, resyncInterval) |
| |
| return &c, nil |
| } |
| |
| // NewDefaultClient returns a default client, using standard Kubernetes config resolution to determine |
| // the cluster to access. |
| func NewDefaultClient() (ExtendedClient, error) { |
| return NewExtendedClient(BuildClientCmd("", ""), "") |
| } |
| |
| // NewExtendedClient creates a Kubernetes client from the given ClientConfig. The "revision" parameter |
| // controls the behavior of GetIstioPods, by selecting a specific revision of the control plane. |
| func NewExtendedClient(clientConfig clientcmd.ClientConfig, revision string) (ExtendedClient, error) { |
| return newClientInternal(newClientFactory(clientConfig), revision) |
| } |
| |
| // NewClient creates a Kubernetes client from the given rest config. |
| func NewClient(clientConfig clientcmd.ClientConfig) (Client, error) { |
| return newClientInternal(newClientFactory(clientConfig), "") |
| } |
| |
| func (c *client) RESTConfig() *rest.Config { |
| if c.config == nil { |
| return nil |
| } |
| cpy := *c.config |
| return &cpy |
| } |
| |
| func (c *client) Ext() kubeExtClient.Interface { |
| return c.extSet |
| } |
| |
| func (c *client) Dynamic() dynamic.Interface { |
| return c.dynamic |
| } |
| |
| func (c *client) Kube() kubernetes.Interface { |
| return c.kube |
| } |
| |
| func (c *client) Metadata() metadata.Interface { |
| return c.metadata |
| } |
| |
| func (c *client) Istio() istioclient.Interface { |
| return c.istio |
| } |
| |
| func (c *client) GatewayAPI() gatewayapiclient.Interface { |
| return c.gatewayapi |
| } |
| |
| func (c *client) KubeInformer() informers.SharedInformerFactory { |
| return c.kubeInformer |
| } |
| |
| func (c *client) DynamicInformer() dynamicinformer.DynamicSharedInformerFactory { |
| return c.dynamicInformer |
| } |
| |
| func (c *client) MetadataInformer() metadatainformer.SharedInformerFactory { |
| return c.metadataInformer |
| } |
| |
| func (c *client) IstioInformer() istioinformer.SharedInformerFactory { |
| return c.istioInformer |
| } |
| |
| func (c *client) GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory { |
| return c.gatewayapiInformer |
| } |
| |
| func (c *client) ExtInformer() kubeExtInformers.SharedInformerFactory { |
| return c.extInformer |
| } |
| |
| // RunAndWait starts all informers and waits for their caches to sync. |
| // Warning: this must be called AFTER .Informer() is called, which will register the informer. |
| func (c *client) RunAndWait(stop <-chan struct{}) { |
| if c.mirrorQueue != nil && !c.mirrorQueueStarted.Load() { |
| c.mirrorQueueStarted.Store(true) |
| go c.mirrorQueue.Run(stop) |
| } |
| c.kubeInformer.Start(stop) |
| c.dynamicInformer.Start(stop) |
| c.metadataInformer.Start(stop) |
| c.istioInformer.Start(stop) |
| c.gatewayapiInformer.Start(stop) |
| c.extInformer.Start(stop) |
| if c.fastSync { |
| // WaitForCacheSync will virtually never be synced on the first call, as its called immediately after Start() |
| // This triggers a 100ms delay per call, which is often called 2-3 times in a test, delaying tests. |
| // Instead, we add an aggressive sync polling |
| fastWaitForCacheSync(stop, c.kubeInformer) |
| fastWaitForCacheSyncDynamic(stop, c.dynamicInformer) |
| fastWaitForCacheSyncDynamic(stop, c.metadataInformer) |
| fastWaitForCacheSync(stop, c.istioInformer) |
| fastWaitForCacheSync(stop, c.gatewayapiInformer) |
| fastWaitForCacheSync(stop, c.extInformer) |
| _ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) { |
| select { |
| case <-stop: |
| return false, fmt.Errorf("channel closed") |
| default: |
| } |
| if c.informerWatchesPending.Load() == 0 { |
| return true, nil |
| } |
| return false, nil |
| }) |
| } else { |
| c.kubeInformer.WaitForCacheSync(stop) |
| c.dynamicInformer.WaitForCacheSync(stop) |
| c.metadataInformer.WaitForCacheSync(stop) |
| c.istioInformer.WaitForCacheSync(stop) |
| c.gatewayapiInformer.WaitForCacheSync(stop) |
| c.extInformer.WaitForCacheSync(stop) |
| } |
| } |
| |
| func (c *client) GetKubernetesVersion() (*kubeVersion.Info, error) { |
| c.versionOnce.Do(func() { |
| v, err := c.Discovery().ServerVersion() |
| if err == nil { |
| c.version = v |
| } |
| }) |
| if c.version != nil { |
| return c.version, nil |
| } |
| // Initial attempt failed, retry on each call to this function |
| v, err := c.Discovery().ServerVersion() |
| if err != nil { |
| c.version = v |
| } |
| return c.version, err |
| } |
| |
| type reflectInformerSync interface { |
| WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool |
| } |
| |
| type dynamicInformerSync interface { |
| WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool |
| } |
| |
| // Wait for cache sync immediately, rather than with 100ms delay which slows tests |
| // See https://github.com/kubernetes/kubernetes/issues/95262#issuecomment-703141573 |
| func fastWaitForCacheSync(stop <-chan struct{}, informerFactory reflectInformerSync) { |
| returnImmediately := make(chan struct{}) |
| close(returnImmediately) |
| _ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) { |
| select { |
| case <-stop: |
| return false, fmt.Errorf("channel closed") |
| default: |
| } |
| for _, synced := range informerFactory.WaitForCacheSync(returnImmediately) { |
| if !synced { |
| return false, nil |
| } |
| } |
| return true, nil |
| }) |
| } |
| |
| func fastWaitForCacheSyncDynamic(stop <-chan struct{}, informerFactory dynamicInformerSync) { |
| returnImmediately := make(chan struct{}) |
| close(returnImmediately) |
| _ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) { |
| select { |
| case <-stop: |
| return false, fmt.Errorf("channel closed") |
| default: |
| } |
| for _, synced := range informerFactory.WaitForCacheSync(returnImmediately) { |
| if !synced { |
| return false, nil |
| } |
| } |
| return true, nil |
| }) |
| } |
| |
| // WaitForCacheSyncInterval waits for caches to populate, with explicitly configured interval |
| func WaitForCacheSyncInterval(stopCh <-chan struct{}, interval time.Duration, cacheSyncs ...cache.InformerSynced) bool { |
| err := wait.PollImmediateUntil(interval, |
| func() (bool, error) { |
| for _, syncFunc := range cacheSyncs { |
| if !syncFunc() { |
| return false, nil |
| } |
| } |
| return true, nil |
| }, |
| stopCh) |
| return err == nil |
| } |
| |
| func (c *client) Revision() string { |
| return c.revision |
| } |
| |
| func (c *client) PodExecCommands(podName, podNamespace, container string, commands []string) (stdout, stderr string, err error) { |
| defer func() { |
| if err != nil { |
| if len(stderr) > 0 { |
| err = fmt.Errorf("error exec'ing into %s/%s %s container: %v\n%s", |
| podNamespace, podName, container, err, stderr) |
| } else { |
| err = fmt.Errorf("error exec'ing into %s/%s %s container: %v", |
| podNamespace, podName, container, err) |
| } |
| } |
| }() |
| |
| req := c.restClient.Post(). |
| Resource("pods"). |
| Name(podName). |
| Namespace(podNamespace). |
| SubResource("exec"). |
| Param("container", container). |
| VersionedParams(&v1.PodExecOptions{ |
| Container: container, |
| Command: commands, |
| Stdin: false, |
| Stdout: true, |
| Stderr: true, |
| TTY: false, |
| }, kubescheme.ParameterCodec) |
| |
| wrapper, upgrader, err := roundTripperFor(c.config) |
| if err != nil { |
| return "", "", err |
| } |
| exec, err := remotecommand.NewSPDYExecutorForTransports(wrapper, upgrader, "POST", req.URL()) |
| if err != nil { |
| return "", "", err |
| } |
| |
| var stdoutBuf, stderrBuf bytes.Buffer |
| err = exec.Stream(remotecommand.StreamOptions{ |
| Stdin: nil, |
| Stdout: &stdoutBuf, |
| Stderr: &stderrBuf, |
| Tty: false, |
| }) |
| |
| stdout = stdoutBuf.String() |
| stderr = stderrBuf.String() |
| return |
| } |
| |
| func (c *client) PodExec(podName, podNamespace, container string, command string) (stdout, stderr string, err error) { |
| commandFields := strings.Fields(command) |
| return c.PodExecCommands(podName, podNamespace, container, commandFields) |
| } |
| |
| func (c *client) PodLogs(ctx context.Context, podName, podNamespace, container string, previousLog bool) (string, error) { |
| opts := &v1.PodLogOptions{ |
| Container: container, |
| Previous: previousLog, |
| } |
| res, err := c.CoreV1().Pods(podNamespace).GetLogs(podName, opts).Stream(ctx) |
| if err != nil { |
| return "", err |
| } |
| defer closeQuietly(res) |
| |
| builder := &strings.Builder{} |
| if _, err = io.Copy(builder, res); err != nil { |
| return "", err |
| } |
| |
| return builder.String(), nil |
| } |
| |
| func (c *client) AllDiscoveryDo(ctx context.Context, istiodNamespace, path string) (map[string][]byte, error) { |
| istiods, err := c.GetIstioPods(ctx, istiodNamespace, map[string]string{ |
| "labelSelector": "app=istiod", |
| "fieldSelector": "status.phase=Running", |
| }) |
| if err != nil { |
| return nil, err |
| } |
| if len(istiods) == 0 { |
| return nil, errors.New("unable to find any Istiod instances") |
| } |
| |
| result := map[string][]byte{} |
| for _, istiod := range istiods { |
| res, err := c.portForwardRequest(ctx, istiod.Name, istiod.Namespace, http.MethodGet, path, 15014) |
| if err != nil { |
| return nil, err |
| } |
| if len(res) > 0 { |
| result[istiod.Name] = res |
| } |
| } |
| // If any Discovery servers responded, treat as a success |
| if len(result) > 0 { |
| return result, nil |
| } |
| return nil, nil |
| } |
| |
| func (c *client) EnvoyDo(ctx context.Context, podName, podNamespace, method, path string) ([]byte, error) { |
| return c.portForwardRequest(ctx, podName, podNamespace, method, path, 15000) |
| } |
| |
| func (c *client) EnvoyDoWithPort(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error) { |
| return c.portForwardRequest(ctx, podName, podNamespace, method, path, port) |
| } |
| |
| func (c *client) portForwardRequest(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error) { |
| formatError := func(err error) error { |
| return fmt.Errorf("failure running port forward process: %v", err) |
| } |
| |
| fw, err := c.NewPortForwarder(podName, podNamespace, "127.0.0.1", 0, port) |
| if err != nil { |
| return nil, err |
| } |
| if err = fw.Start(); err != nil { |
| return nil, formatError(err) |
| } |
| defer fw.Close() |
| req, err := http.NewRequest(method, fmt.Sprintf("http://%s/%s", fw.Address(), path), nil) |
| if err != nil { |
| return nil, formatError(err) |
| } |
| resp, err := http.DefaultClient.Do(req.WithContext(ctx)) |
| if err != nil { |
| return nil, formatError(err) |
| } |
| defer closeQuietly(resp.Body) |
| out, err := io.ReadAll(resp.Body) |
| if err != nil { |
| return nil, formatError(err) |
| } |
| |
| return out, nil |
| } |
| |
| func (c *client) GetIstioPods(ctx context.Context, namespace string, params map[string]string) ([]v1.Pod, error) { |
| if c.revision != "" { |
| labelSelector, ok := params["labelSelector"] |
| if ok { |
| params["labelSelector"] = fmt.Sprintf("%s,%s=%s", labelSelector, label.IoIstioRev.Name, c.revision) |
| } else { |
| params["labelSelector"] = fmt.Sprintf("%s=%s", label.IoIstioRev.Name, c.revision) |
| } |
| } |
| |
| req := c.restClient.Get(). |
| Resource("pods"). |
| Namespace(namespace) |
| for k, v := range params { |
| req.Param(k, v) |
| } |
| |
| res := req.Do(ctx) |
| if res.Error() != nil { |
| return nil, fmt.Errorf("unable to retrieve Pods: %v", res.Error()) |
| } |
| list := &v1.PodList{} |
| if err := res.Into(list); err != nil { |
| return nil, fmt.Errorf("unable to parse PodList: %v", res.Error()) |
| } |
| return list.Items, nil |
| } |
| |
| // ExtractExecResult wraps PodExec and return the execution result and error if has any. |
| func (c *client) extractExecResult(podName, podNamespace, container, cmd string) (string, error) { |
| stdout, stderr, err := c.PodExec(podName, podNamespace, container, cmd) |
| if err != nil { |
| if stderr != "" { |
| return "", fmt.Errorf("error exec'ing into %s/%s %s container: %w\n%s", podNamespace, podName, container, err, stderr) |
| } |
| return "", fmt.Errorf("error exec'ing into %s/%s %s container: %w", podNamespace, podName, container, err) |
| } |
| return stdout, nil |
| } |
| |
| func (c *client) GetIstioVersions(ctx context.Context, namespace string) (*version.MeshInfo, error) { |
| pods, err := c.GetIstioPods(ctx, namespace, map[string]string{ |
| "labelSelector": "app=istiod", |
| "fieldSelector": "status.phase=Running", |
| }) |
| if err != nil { |
| return nil, err |
| } |
| if len(pods) == 0 { |
| return nil, fmt.Errorf("no running Istio pods in %q", namespace) |
| } |
| |
| var errs error |
| res := version.MeshInfo{} |
| for _, pod := range pods { |
| component := pod.Labels["istio"] |
| server := version.ServerInfo{Component: component} |
| |
| // :15014/version returns something like |
| // 1.7-alpha.9c900ba74d10a1affe7c23557ef0eebd6103b03c-9c900ba74d10a1affe7c23557ef0eebd6103b03c-Clean |
| result, err := c.CoreV1().Pods(pod.Namespace).ProxyGet("", pod.Name, "15014", "/version", nil).DoRaw(ctx) |
| if err != nil { |
| bi, execErr := c.getIstioVersionUsingExec(&pod) |
| if execErr != nil { |
| errs = multierror.Append(errs, |
| fmt.Errorf("error port-forwarding into %s.%s: %v", pod.Namespace, pod.Name, err), |
| execErr, |
| ) |
| continue |
| } |
| server.Info = *bi |
| res = append(res, server) |
| continue |
| } |
| if len(result) > 0 { |
| setServerInfoWithIstiodVersionInfo(&server.Info, string(result)) |
| // (Golang version not available through :15014/version endpoint) |
| |
| res = append(res, server) |
| } |
| } |
| return &res, errs |
| } |
| |
| func (c *client) getIstioVersionUsingExec(pod *v1.Pod) (*version.BuildInfo, error) { |
| // exclude data plane components from control plane list |
| labelToPodDetail := map[string]struct { |
| binary string |
| container string |
| }{ |
| "pilot": {"/usr/local/bin/pilot-discovery", "discovery"}, |
| "istiod": {"/usr/local/bin/pilot-discovery", "discovery"}, |
| "citadel": {"/usr/local/bin/istio_ca", "citadel"}, |
| "galley": {"/usr/local/bin/galley", "galley"}, |
| "telemetry": {"/usr/local/bin/mixs", "mixer"}, |
| "policy": {"/usr/local/bin/mixs", "mixer"}, |
| "sidecar-injector": {"/usr/local/bin/sidecar-injector", "sidecar-injector-webhook"}, |
| } |
| |
| component := pod.Labels["istio"] |
| |
| // Special cases |
| switch component { |
| case "statsd-prom-bridge": |
| // statsd-prom-bridge doesn't support version |
| return nil, fmt.Errorf("statsd-prom-bridge doesn't support version") |
| case "mixer": |
| component = pod.Labels["istio-mixer-type"] |
| } |
| |
| detail, ok := labelToPodDetail[component] |
| if !ok { |
| return nil, fmt.Errorf("unknown Istio component %q", component) |
| } |
| |
| stdout, stderr, err := c.PodExec(pod.Name, pod.Namespace, detail.container, |
| fmt.Sprintf("%s version -o json", detail.binary)) |
| if err != nil { |
| return nil, fmt.Errorf("error exec'ing into %s %s container: %w", pod.Name, detail.container, err) |
| } |
| |
| var v version.Version |
| err = json.Unmarshal([]byte(stdout), &v) |
| if err == nil && v.ClientVersion.Version != "" { |
| return v.ClientVersion, nil |
| } |
| |
| return nil, fmt.Errorf("error reading %s %s container version: %v", pod.Name, detail.container, stderr) |
| } |
| |
| func (c *client) NewPortForwarder(podName, ns, localAddress string, localPort int, podPort int) (PortForwarder, error) { |
| return newPortForwarder(c.config, podName, ns, localAddress, localPort, podPort) |
| } |
| |
| func (c *client) PodsForSelector(ctx context.Context, namespace string, labelSelectors ...string) (*v1.PodList, error) { |
| return c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ |
| LabelSelector: strings.Join(labelSelectors, ","), |
| }) |
| } |
| |
| func (c *client) ApplyYAMLFiles(namespace string, yamlFiles ...string) error { |
| g, _ := errgroup.WithContext(context.TODO()) |
| for _, f := range removeEmptyFiles(yamlFiles) { |
| f := f |
| g.Go(func() error { |
| return c.applyYAMLFile(namespace, false, f) |
| }) |
| } |
| return g.Wait() |
| } |
| |
| func (c *client) ApplyYAMLFilesDryRun(namespace string, yamlFiles ...string) error { |
| g, _ := errgroup.WithContext(context.TODO()) |
| for _, f := range removeEmptyFiles(yamlFiles) { |
| f := f |
| g.Go(func() error { |
| return c.applyYAMLFile(namespace, true, f) |
| }) |
| } |
| return g.Wait() |
| } |
| |
| func (c *client) CreatePerRPCCredentials(_ context.Context, tokenNamespace, tokenServiceAccount string, audiences []string, |
| expirationSeconds int64) (credentials.PerRPCCredentials, error) { |
| return NewRPCCredentials(c, tokenNamespace, tokenServiceAccount, audiences, expirationSeconds, 60) |
| } |
| |
| func (c *client) UtilFactory() util.Factory { |
| return c.clientFactory |
| } |
| |
| // TODO once we drop Kubernetes 1.15 support we can drop all of this code in favor of Server Side Apply |
| // Following https://ymmt2005.hatenablog.com/entry/2020/04/14/An_example_of_using_dynamic_client_of_k8s.io/client-go |
| func (c *client) applyYAMLFile(namespace string, dryRun bool, file string) error { |
| // Create the options. |
| streams, _, stdout, stderr := genericclioptions.NewTestIOStreams() |
| flags := apply.NewApplyFlags(c.clientFactory, streams) |
| flags.DeleteFlags.FileNameFlags.Filenames = &[]string{file} |
| |
| cmd := apply.NewCmdApply("", c.clientFactory, streams) |
| opts, err := flags.ToOptions(cmd, "", nil) |
| if err != nil { |
| return err |
| } |
| opts.DynamicClient = c.dynamic |
| opts.DryRunVerifier = resource.NewQueryParamVerifier(c.dynamic, c.discoveryClient, resource.QueryParamDryRun) |
| opts.FieldValidationVerifier = resource.NewQueryParamVerifier(c.dynamic, c.clientFactory.OpenAPIGetter(), resource.QueryParamFieldValidation) |
| opts.FieldManager = fieldManager |
| if dryRun { |
| opts.DryRunStrategy = util.DryRunServer |
| } |
| |
| // allow for a success message operation to be specified at print time |
| opts.ToPrinter = func(operation string) (printers.ResourcePrinter, error) { |
| opts.PrintFlags.NamePrintFlags.Operation = operation |
| util.PrintFlagsWithDryRunStrategy(opts.PrintFlags, opts.DryRunStrategy) |
| return opts.PrintFlags.ToPrinter() |
| } |
| |
| if len(namespace) > 0 { |
| opts.Namespace = namespace |
| opts.EnforceNamespace = true |
| } else { |
| var err error |
| opts.Namespace, opts.EnforceNamespace, err = c.clientFactory.ToRawKubeConfigLoader().Namespace() |
| if err != nil { |
| return err |
| } |
| } |
| |
| opts.DeleteOptions = &kubectlDelete.DeleteOptions{ |
| DynamicClient: c.dynamic, |
| IOStreams: streams, |
| FilenameOptions: flags.DeleteFlags.FileNameFlags.ToOptions(), |
| } |
| |
| opts.OpenAPISchema, _ = c.clientFactory.OpenAPISchema() |
| |
| opts.Validator, err = c.clientFactory.Validator(metav1.FieldValidationStrict, opts.FieldValidationVerifier) |
| if err != nil { |
| return err |
| } |
| opts.Builder = c.clientFactory.NewBuilder() |
| opts.Mapper = c.mapper |
| |
| opts.PostProcessorFn = opts.PrintAndPrunePostProcessor() |
| |
| if err := opts.Run(); err != nil { |
| // Concatenate the stdout and stderr |
| s := stdout.String() + stderr.String() |
| return fmt.Errorf("%v: %s", err, s) |
| } |
| // If we are changing CRDs, invalidate the discovery client so future calls will not fail |
| if !dryRun { |
| f, _ := os.ReadFile(file) |
| if len(yml.SplitYamlByKind(string(f))[gvk.CustomResourceDefinition.Kind]) > 0 { |
| c.discoveryClient.Invalidate() |
| } |
| } |
| return nil |
| } |
| |
| func (c *client) DeleteYAMLFiles(namespace string, yamlFiles ...string) (err error) { |
| yamlFiles = removeEmptyFiles(yamlFiles) |
| |
| // Run each delete concurrently and collect the errors. |
| errs := make([]error, len(yamlFiles)) |
| g, _ := errgroup.WithContext(context.TODO()) |
| for i, f := range yamlFiles { |
| i, f := i, f |
| g.Go(func() error { |
| errs[i] = c.deleteFile(namespace, false, f) |
| return errs[i] |
| }) |
| } |
| _ = g.Wait() |
| return multierror.Append(nil, errs...).ErrorOrNil() |
| } |
| |
| func (c *client) DeleteYAMLFilesDryRun(namespace string, yamlFiles ...string) (err error) { |
| yamlFiles = removeEmptyFiles(yamlFiles) |
| |
| // Run each delete concurrently and collect the errors. |
| errs := make([]error, len(yamlFiles)) |
| g, _ := errgroup.WithContext(context.TODO()) |
| for i, f := range yamlFiles { |
| i, f := i, f |
| g.Go(func() error { |
| errs[i] = c.deleteFile(namespace, true, f) |
| return errs[i] |
| }) |
| } |
| _ = g.Wait() |
| return multierror.Append(nil, errs...).ErrorOrNil() |
| } |
| |
| func (c *client) deleteFile(namespace string, dryRun bool, file string) error { |
| // Create the options. |
| streams, _, stdout, stderr := genericclioptions.NewTestIOStreams() |
| |
| cmdNamespace, enforceNamespace, err := c.clientFactory.ToRawKubeConfigLoader().Namespace() |
| if err != nil { |
| return err |
| } |
| |
| if len(namespace) > 0 { |
| cmdNamespace = namespace |
| enforceNamespace = true |
| } |
| |
| fileOpts := resource.FilenameOptions{ |
| Filenames: []string{file}, |
| } |
| |
| opts := kubectlDelete.DeleteOptions{ |
| FilenameOptions: fileOpts, |
| CascadingStrategy: metav1.DeletePropagationBackground, |
| GracePeriod: -1, |
| IgnoreNotFound: true, |
| WaitForDeletion: true, |
| WarnClusterScope: enforceNamespace, |
| DynamicClient: c.dynamic, |
| DryRunVerifier: resource.NewQueryParamVerifier(c.dynamic, c.discoveryClient, resource.QueryParamDryRun), |
| IOStreams: streams, |
| } |
| if dryRun { |
| opts.DryRunStrategy = util.DryRunServer |
| } |
| |
| r := c.clientFactory.NewBuilder(). |
| Unstructured(). |
| ContinueOnError(). |
| NamespaceParam(cmdNamespace).DefaultNamespace(). |
| FilenameParam(enforceNamespace, &fileOpts). |
| LabelSelectorParam(opts.LabelSelector). |
| FieldSelectorParam(opts.FieldSelector). |
| SelectAllParam(opts.DeleteAll). |
| AllNamespaces(opts.DeleteAllNamespaces). |
| Flatten(). |
| Do() |
| err = r.Err() |
| if err != nil { |
| return err |
| } |
| opts.Result = r |
| |
| opts.Mapper = c.mapper |
| |
| if err := opts.RunDelete(c.clientFactory); err != nil { |
| // Concatenate the stdout and stderr |
| s := stdout.String() + stderr.String() |
| return fmt.Errorf("%v: %s", err, s) |
| } |
| return nil |
| } |
| |
| func closeQuietly(c io.Closer) { |
| _ = c.Close() |
| } |
| |
| func removeEmptyFiles(files []string) []string { |
| out := make([]string, 0, len(files)) |
| for _, f := range files { |
| if !isEmptyFile(f) { |
| out = append(out, f) |
| } |
| } |
| return out |
| } |
| |
| func isEmptyFile(f string) bool { |
| fileInfo, err := os.Stat(f) |
| if err != nil { |
| return true |
| } |
| if fileInfo.Size() == 0 { |
| return true |
| } |
| return false |
| } |
| |
| // IstioScheme returns a scheme will all known Istio-related types added |
| var IstioScheme = istioScheme() |
| |
| // FakeIstioScheme is an IstioScheme that has List type registered. |
| var FakeIstioScheme = func() *runtime.Scheme { |
| s := istioScheme() |
| // Workaround https://github.com/kubernetes/kubernetes/issues/107823 |
| s.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-metadata-client-group", Version: "v1", Kind: "List"}, &metav1.List{}) |
| return s |
| }() |
| |
| func istioScheme() *runtime.Scheme { |
| scheme := runtime.NewScheme() |
| utilruntime.Must(kubescheme.AddToScheme(scheme)) |
| utilruntime.Must(mcs.AddToScheme(scheme)) |
| utilruntime.Must(clientnetworkingalpha.AddToScheme(scheme)) |
| utilruntime.Must(clientnetworkingbeta.AddToScheme(scheme)) |
| utilruntime.Must(clientsecurity.AddToScheme(scheme)) |
| utilruntime.Must(clienttelemetry.AddToScheme(scheme)) |
| utilruntime.Must(clientextensions.AddToScheme(scheme)) |
| utilruntime.Must(gatewayapi.AddToScheme(scheme)) |
| utilruntime.Must(apis.AddToScheme(scheme)) |
| utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) |
| return scheme |
| } |
| |
| func setServerInfoWithIstiodVersionInfo(serverInfo *version.BuildInfo, istioInfo string) { |
| versionParts := strings.Split(istioInfo, "-") |
| nParts := len(versionParts) |
| if nParts >= 3 { |
| // The format will be like 1.12.0-016bc46f4a5e0ef3fa135b3c5380ab7765467c1a-dirty-Modified |
| // version is '1.12.0' |
| // revision is '016bc46f4a5e0ef3fa135b3c5380ab7765467c1a-dirty' |
| // status is 'Modified' |
| serverInfo.Version = versionParts[0] |
| serverInfo.GitTag = serverInfo.Version |
| serverInfo.GitRevision = strings.Join(versionParts[1:nParts-1], "-") |
| serverInfo.BuildStatus = versionParts[nParts-1] |
| } else { |
| serverInfo.Version = istioInfo |
| } |
| } |