blob: 34bd6f2665f2685b0676269b9d6842cf6aa273e4 [file] [log] [blame]
// Copyright Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
import (
clientextensions ""
clientnetworkingalpha ""
clientnetworkingbeta ""
clientsecurity ""
clienttelemetry ""
istioclient ""
istiofake ""
istioinformer ""
v1 ""
apiextensionsv1 ""
kubeExtClient ""
extfake ""
kubeExtInformers ""
metav1 ""
utilruntime ""
kubeVersion ""
dynamicfake ""
kubescheme ""
metadatafake ""
clienttesting ""
kubectlDelete ""
gatewayapi ""
gatewayapiclient ""
gatewayapifake ""
gatewayapiinformer ""
import (
const (
defaultLocalAddress = "localhost"
fieldManager = "istio-kube-client"
// Client is a helper for common Kubernetes client operations. This contains various different kubernetes
// clients using a shared config. It is expected that all of Istiod can share the same set of clients and
// informers. Sharing informers is especially important for load on the API server/Istiod itself.
type Client interface {
// TODO: stop embedding this, it will conflict with future additions. Use Kube() instead is preferred
// RESTConfig returns the Kubernetes rest.Config used to configure the clients.
RESTConfig() *rest.Config
// Ext returns the API extensions client.
Ext() kubeExtClient.Interface
// Kube returns the core kube client
Kube() kubernetes.Interface
// Dynamic client.
Dynamic() dynamic.Interface
// Metadata returns the Metadata kube client.
Metadata() metadata.Interface
// Istio returns the Istio kube client.
Istio() istioclient.Interface
// GatewayAPI returns the gateway-api kube client.
GatewayAPI() gatewayapiclient.Interface
// KubeInformer returns an informer for core kube client
KubeInformer() informers.SharedInformerFactory
// DynamicInformer returns an informer for dynamic client
DynamicInformer() dynamicinformer.DynamicSharedInformerFactory
// MetadataInformer returns an informer for metadata client
MetadataInformer() metadatainformer.SharedInformerFactory
// IstioInformer returns an informer for the istio client
IstioInformer() istioinformer.SharedInformerFactory
// GatewayAPIInformer returns an informer for the gateway-api client
GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory
// ExtInformer returns an informer for the extension client
ExtInformer() kubeExtInformers.SharedInformerFactory
// RunAndWait starts all informers and waits for their caches to sync.
// Warning: this must be called AFTER .Informer() is called, which will register the informer.
RunAndWait(stop <-chan struct{})
// GetKubernetesVersion returns the Kubernetes server version
GetKubernetesVersion() (*kubeVersion.Info, error)
// ExtendedClient is an extended client with additional helpers/functionality for Istioctl and testing.
type ExtendedClient interface {
// Revision of the Istio control plane.
Revision() string
// EnvoyDo makes an http request to the Envoy in the specified pod.
EnvoyDo(ctx context.Context, podName, podNamespace, method, path string) ([]byte, error)
// EnvoyDoWithPort makes an http request to the Envoy in the specified pod and port.
EnvoyDoWithPort(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error)
// AllDiscoveryDo makes an http request to each Istio discovery instance.
AllDiscoveryDo(ctx context.Context, namespace, path string) (map[string][]byte, error)
// GetIstioVersions gets the version for each Istio control plane component.
GetIstioVersions(ctx context.Context, namespace string) (*version.MeshInfo, error)
// PodsForSelector finds pods matching selector.
PodsForSelector(ctx context.Context, namespace string, labelSelectors ...string) (*v1.PodList, error)
// GetIstioPods retrieves the pod objects for Istio deployments
GetIstioPods(ctx context.Context, namespace string, params map[string]string) ([]v1.Pod, error)
// PodExecCommands takes a list of commands and the pod data to run the commands in the specified pod.
PodExecCommands(podName, podNamespace, container string, commands []string) (stdout string, stderr string, err error)
// PodExec takes a command and the pod data to run the command in the specified pod.
PodExec(podName, podNamespace, container string, command string) (stdout string, stderr string, err error)
// PodLogs retrieves the logs for the given pod.
PodLogs(ctx context.Context, podName string, podNamespace string, container string, previousLog bool) (string, error)
// NewPortForwarder creates a new PortForwarder configured for the given pod. If localPort=0, a port will be
// dynamically selected. If localAddress is empty, "localhost" is used.
NewPortForwarder(podName string, ns string, localAddress string, localPort int, podPort int) (PortForwarder, error)
// ApplyYAMLFiles applies the resources in the given YAML files.
ApplyYAMLFiles(namespace string, yamlFiles ...string) error
// ApplyYAMLFilesDryRun performs a dry run for applying the resource in the given YAML files
ApplyYAMLFilesDryRun(namespace string, yamlFiles ...string) error
// DeleteYAMLFiles deletes the resources in the given YAML files.
DeleteYAMLFiles(namespace string, yamlFiles ...string) error
// DeleteYAMLFilesDryRun performs a dry run for deleting the resources in the given YAML files.
DeleteYAMLFilesDryRun(namespace string, yamlFiles ...string) error
// CreatePerRPCCredentials creates a gRPC bearer token provider that can create (and renew!) Istio tokens
CreatePerRPCCredentials(ctx context.Context, tokenNamespace, tokenServiceAccount string, audiences []string,
expirationSeconds int64) (credentials.PerRPCCredentials, error)
// UtilFactory returns a kubectl factory
UtilFactory() util.Factory
var (
_ Client = &client{}
_ ExtendedClient = &client{}
const resyncInterval = 0
// NewFakeClient creates a new, fake, client
func NewFakeClient(objects ...runtime.Object) ExtendedClient {
c := &client{
informerWatchesPending: atomic.NewInt32(0),
c.Interface = fake.NewSimpleClientset(objects...)
c.kube = c.Interface
c.kubeInformer = informers.NewSharedInformerFactory(c.Interface, resyncInterval)
s := FakeIstioScheme
c.metadata = metadatafake.NewSimpleMetadataClient(s)
c.metadataInformer = metadatainformer.NewSharedInformerFactory(c.metadata, resyncInterval)
// Support some galley tests using basicmetadata
// If you are adding something to this list, consider other options like adding to the scheme.
gvrToListKind := map[schema.GroupVersionResource]string{
{Group: "", Version: "v1alpha1", Resource: "Kind1s"}: "Kind1List",
c.dynamic = dynamicfake.NewSimpleDynamicClientWithCustomListKinds(s, gvrToListKind)
c.dynamicInformer = dynamicinformer.NewDynamicSharedInformerFactory(c.dynamic, resyncInterval)
c.istio = istiofake.NewSimpleClientset()
c.istioInformer = istioinformer.NewSharedInformerFactoryWithOptions(c.istio, resyncInterval)
c.gatewayapi = gatewayapifake.NewSimpleClientset()
c.gatewayapiInformer = gatewayapiinformer.NewSharedInformerFactory(c.gatewayapi, resyncInterval)
c.extSet = extfake.NewSimpleClientset()
c.extInformer = kubeExtInformers.NewSharedInformerFactory(c.extSet, resyncInterval)
// There is a race condition in the client fakes, where events that happen between the List and Watch
// of an informer are dropped. To avoid this, we explicitly manage the list and watch, ensuring all lists
// have an associated watch before continuing.
// This would likely break any direct calls to List(), but for now our tests don't do that anyways. If we need
// to in the future we will need to identify the Lists that have a corresponding Watch, possibly by looking
// at created Informers
// an atomic.Int is used instead of sync.WaitGroup because wg.Add and wg.Wait cannot be called concurrently
listReactor := func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return false, nil, nil
watchReactor := func(tracker clienttesting.ObjectTracker) func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
return func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := tracker.Watch(gvr, ns)
if err != nil {
return false, nil, err
return true, watch, nil
for _, fc := range []fakeClient{
// TODO: send PR to client-go to add Tracker()
// c.metadata.(*metadatafake.FakeMetadataClient),
} {
fc.PrependReactor("list", "*", listReactor)
fc.PrependWatchReactor("*", watchReactor(fc.Tracker()))
// discoveryv1/EndpontSlices readable from discoveryv1beta1/EndpointSlices
c.mirrorQueue = queue.NewQueue(1 * time.Second)
c.fastSync = true
return c
func NewFakeClientWithVersion(minor string, objects ...runtime.Object) ExtendedClient {
c := NewFakeClient(objects...).(*client)
if minor != "" && minor != "latest" {
c.versionOnce.Do(func() {
c.version = &kubeVersion.Info{Major: "1", Minor: minor, GitVersion: fmt.Sprintf("v1.%v.0", minor)}
return c
type fakeClient interface {
PrependReactor(verb, resource string, reaction clienttesting.ReactionFunc)
PrependWatchReactor(resource string, reaction clienttesting.WatchReactionFunc)
Tracker() clienttesting.ObjectTracker
// Client is a helper wrapper around the Kube RESTClient for istioctl -> Pilot/Envoy/Mesh related things
type client struct {
clientFactory util.Factory
config *rest.Config
extSet kubeExtClient.Interface
extInformer kubeExtInformers.SharedInformerFactory
kube kubernetes.Interface
kubeInformer informers.SharedInformerFactory
dynamic dynamic.Interface
dynamicInformer dynamicinformer.DynamicSharedInformerFactory
metadata metadata.Interface
metadataInformer metadatainformer.SharedInformerFactory
istio istioclient.Interface
istioInformer istioinformer.SharedInformerFactory
gatewayapi gatewayapiclient.Interface
gatewayapiInformer gatewayapiinformer.SharedInformerFactory
// If enable, will wait for cache syncs with extremely short delay. This should be used only for tests
fastSync bool
informerWatchesPending *atomic.Int32
mirrorQueue queue.Instance
mirrorQueueStarted atomic.Bool
// These may be set only when creating an extended client.
revision string
restClient *rest.RESTClient
discoveryClient discovery.CachedDiscoveryInterface
mapper meta.RESTMapper
versionOnce sync.Once
version *kubeVersion.Info
// newClientInternal creates a Kubernetes client from the given factory.
func newClientInternal(clientFactory util.Factory, revision string) (*client, error) {
var c client
var err error
c.clientFactory = clientFactory
c.config, err = clientFactory.ToRESTConfig()
if err != nil {
return nil, err
c.revision = revision
c.restClient, err = clientFactory.RESTClient()
if err != nil {
return nil, err
c.discoveryClient, err = clientFactory.ToDiscoveryClient()
if err != nil {
return nil, err
c.mapper, err = clientFactory.ToRESTMapper()
if err != nil {
return nil, err
c.Interface, err = kubernetes.NewForConfig(c.config)
c.kube = c.Interface
if err != nil {
return nil, err
c.kubeInformer = informers.NewSharedInformerFactory(c.Interface, resyncInterval)
c.metadata, err = metadata.NewForConfig(c.config)
if err != nil {
return nil, err
c.metadataInformer = metadatainformer.NewSharedInformerFactory(c.metadata, resyncInterval)
c.dynamic, err = dynamic.NewForConfig(c.config)
if err != nil {
return nil, err
c.dynamicInformer = dynamicinformer.NewDynamicSharedInformerFactory(c.dynamic, resyncInterval)
c.istio, err = istioclient.NewForConfig(c.config)
if err != nil {
return nil, err
c.istioInformer = istioinformer.NewSharedInformerFactory(c.istio, resyncInterval)
c.gatewayapi, err = gatewayapiclient.NewForConfig(c.config)
if err != nil {
return nil, err
c.gatewayapiInformer = gatewayapiinformer.NewSharedInformerFactory(c.gatewayapi, resyncInterval)
c.extSet, err = kubeExtClient.NewForConfig(c.config)
if err != nil {
return nil, err
c.extInformer = kubeExtInformers.NewSharedInformerFactory(c.extSet, resyncInterval)
return &c, nil
// NewDefaultClient returns a default client, using standard Kubernetes config resolution to determine
// the cluster to access.
func NewDefaultClient() (ExtendedClient, error) {
return NewExtendedClient(BuildClientCmd("", ""), "")
// NewExtendedClient creates a Kubernetes client from the given ClientConfig. The "revision" parameter
// controls the behavior of GetIstioPods, by selecting a specific revision of the control plane.
func NewExtendedClient(clientConfig clientcmd.ClientConfig, revision string) (ExtendedClient, error) {
return newClientInternal(newClientFactory(clientConfig), revision)
// NewClient creates a Kubernetes client from the given rest config.
func NewClient(clientConfig clientcmd.ClientConfig) (Client, error) {
return newClientInternal(newClientFactory(clientConfig), "")
func (c *client) RESTConfig() *rest.Config {
if c.config == nil {
return nil
cpy := *c.config
return &cpy
func (c *client) Ext() kubeExtClient.Interface {
return c.extSet
func (c *client) Dynamic() dynamic.Interface {
return c.dynamic
func (c *client) Kube() kubernetes.Interface {
return c.kube
func (c *client) Metadata() metadata.Interface {
return c.metadata
func (c *client) Istio() istioclient.Interface {
return c.istio
func (c *client) GatewayAPI() gatewayapiclient.Interface {
return c.gatewayapi
func (c *client) KubeInformer() informers.SharedInformerFactory {
return c.kubeInformer
func (c *client) DynamicInformer() dynamicinformer.DynamicSharedInformerFactory {
return c.dynamicInformer
func (c *client) MetadataInformer() metadatainformer.SharedInformerFactory {
return c.metadataInformer
func (c *client) IstioInformer() istioinformer.SharedInformerFactory {
return c.istioInformer
func (c *client) GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory {
return c.gatewayapiInformer
func (c *client) ExtInformer() kubeExtInformers.SharedInformerFactory {
return c.extInformer
// RunAndWait starts all informers and waits for their caches to sync.
// Warning: this must be called AFTER .Informer() is called, which will register the informer.
func (c *client) RunAndWait(stop <-chan struct{}) {
if c.mirrorQueue != nil && !c.mirrorQueueStarted.Load() {
go c.mirrorQueue.Run(stop)
if c.fastSync {
// WaitForCacheSync will virtually never be synced on the first call, as its called immediately after Start()
// This triggers a 100ms delay per call, which is often called 2-3 times in a test, delaying tests.
// Instead, we add an aggressive sync polling
fastWaitForCacheSync(stop, c.kubeInformer)
fastWaitForCacheSyncDynamic(stop, c.dynamicInformer)
fastWaitForCacheSyncDynamic(stop, c.metadataInformer)
fastWaitForCacheSync(stop, c.istioInformer)
fastWaitForCacheSync(stop, c.gatewayapiInformer)
fastWaitForCacheSync(stop, c.extInformer)
_ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) {
select {
case <-stop:
return false, fmt.Errorf("channel closed")
if c.informerWatchesPending.Load() == 0 {
return true, nil
return false, nil
} else {
func (c *client) GetKubernetesVersion() (*kubeVersion.Info, error) {
c.versionOnce.Do(func() {
v, err := c.Discovery().ServerVersion()
if err == nil {
c.version = v
if c.version != nil {
return c.version, nil
// Initial attempt failed, retry on each call to this function
v, err := c.Discovery().ServerVersion()
if err != nil {
c.version = v
return c.version, err
type reflectInformerSync interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
type dynamicInformerSync interface {
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
// Wait for cache sync immediately, rather than with 100ms delay which slows tests
// See
func fastWaitForCacheSync(stop <-chan struct{}, informerFactory reflectInformerSync) {
returnImmediately := make(chan struct{})
_ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) {
select {
case <-stop:
return false, fmt.Errorf("channel closed")
for _, synced := range informerFactory.WaitForCacheSync(returnImmediately) {
if !synced {
return false, nil
return true, nil
func fastWaitForCacheSyncDynamic(stop <-chan struct{}, informerFactory dynamicInformerSync) {
returnImmediately := make(chan struct{})
_ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) {
select {
case <-stop:
return false, fmt.Errorf("channel closed")
for _, synced := range informerFactory.WaitForCacheSync(returnImmediately) {
if !synced {
return false, nil
return true, nil
// WaitForCacheSyncInterval waits for caches to populate, with explicitly configured interval
func WaitForCacheSyncInterval(stopCh <-chan struct{}, interval time.Duration, cacheSyncs ...cache.InformerSynced) bool {
err := wait.PollImmediateUntil(interval,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
return true, nil
return err == nil
func (c *client) Revision() string {
return c.revision
func (c *client) PodExecCommands(podName, podNamespace, container string, commands []string) (stdout, stderr string, err error) {
defer func() {
if err != nil {
if len(stderr) > 0 {
err = fmt.Errorf("error exec'ing into %s/%s %s container: %v\n%s",
podNamespace, podName, container, err, stderr)
} else {
err = fmt.Errorf("error exec'ing into %s/%s %s container: %v",
podNamespace, podName, container, err)
req := c.restClient.Post().
Param("container", container).
Container: container,
Command: commands,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, kubescheme.ParameterCodec)
wrapper, upgrader, err := roundTripperFor(c.config)
if err != nil {
return "", "", err
exec, err := remotecommand.NewSPDYExecutorForTransports(wrapper, upgrader, "POST", req.URL())
if err != nil {
return "", "", err
var stdoutBuf, stderrBuf bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
Tty: false,
stdout = stdoutBuf.String()
stderr = stderrBuf.String()
func (c *client) PodExec(podName, podNamespace, container string, command string) (stdout, stderr string, err error) {
commandFields := strings.Fields(command)
return c.PodExecCommands(podName, podNamespace, container, commandFields)
func (c *client) PodLogs(ctx context.Context, podName, podNamespace, container string, previousLog bool) (string, error) {
opts := &v1.PodLogOptions{
Container: container,
Previous: previousLog,
res, err := c.CoreV1().Pods(podNamespace).GetLogs(podName, opts).Stream(ctx)
if err != nil {
return "", err
defer closeQuietly(res)
builder := &strings.Builder{}
if _, err = io.Copy(builder, res); err != nil {
return "", err
return builder.String(), nil
func (c *client) AllDiscoveryDo(ctx context.Context, istiodNamespace, path string) (map[string][]byte, error) {
istiods, err := c.GetIstioPods(ctx, istiodNamespace, map[string]string{
"labelSelector": "app=istiod",
"fieldSelector": "status.phase=Running",
if err != nil {
return nil, err
if len(istiods) == 0 {
return nil, errors.New("unable to find any Istiod instances")
result := map[string][]byte{}
for _, istiod := range istiods {
res, err := c.portForwardRequest(ctx, istiod.Name, istiod.Namespace, http.MethodGet, path, 15014)
if err != nil {
return nil, err
if len(res) > 0 {
result[istiod.Name] = res
// If any Discovery servers responded, treat as a success
if len(result) > 0 {
return result, nil
return nil, nil
func (c *client) EnvoyDo(ctx context.Context, podName, podNamespace, method, path string) ([]byte, error) {
return c.portForwardRequest(ctx, podName, podNamespace, method, path, 15000)
func (c *client) EnvoyDoWithPort(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error) {
return c.portForwardRequest(ctx, podName, podNamespace, method, path, port)
func (c *client) portForwardRequest(ctx context.Context, podName, podNamespace, method, path string, port int) ([]byte, error) {
formatError := func(err error) error {
return fmt.Errorf("failure running port forward process: %v", err)
fw, err := c.NewPortForwarder(podName, podNamespace, "", 0, port)
if err != nil {
return nil, err
if err = fw.Start(); err != nil {
return nil, formatError(err)
defer fw.Close()
req, err := http.NewRequest(method, fmt.Sprintf("http://%s/%s", fw.Address(), path), nil)
if err != nil {
return nil, formatError(err)
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, formatError(err)
defer closeQuietly(resp.Body)
out, err := io.ReadAll(resp.Body)
if err != nil {
return nil, formatError(err)
return out, nil
func (c *client) GetIstioPods(ctx context.Context, namespace string, params map[string]string) ([]v1.Pod, error) {
if c.revision != "" {
labelSelector, ok := params["labelSelector"]
if ok {
params["labelSelector"] = fmt.Sprintf("%s,%s=%s", labelSelector, label.IoIstioRev.Name, c.revision)
} else {
params["labelSelector"] = fmt.Sprintf("%s=%s", label.IoIstioRev.Name, c.revision)
req := c.restClient.Get().
for k, v := range params {
req.Param(k, v)
res := req.Do(ctx)
if res.Error() != nil {
return nil, fmt.Errorf("unable to retrieve Pods: %v", res.Error())
list := &v1.PodList{}
if err := res.Into(list); err != nil {
return nil, fmt.Errorf("unable to parse PodList: %v", res.Error())
return list.Items, nil
// ExtractExecResult wraps PodExec and return the execution result and error if has any.
func (c *client) extractExecResult(podName, podNamespace, container, cmd string) (string, error) {
stdout, stderr, err := c.PodExec(podName, podNamespace, container, cmd)
if err != nil {
if stderr != "" {
return "", fmt.Errorf("error exec'ing into %s/%s %s container: %w\n%s", podNamespace, podName, container, err, stderr)
return "", fmt.Errorf("error exec'ing into %s/%s %s container: %w", podNamespace, podName, container, err)
return stdout, nil
func (c *client) GetIstioVersions(ctx context.Context, namespace string) (*version.MeshInfo, error) {
pods, err := c.GetIstioPods(ctx, namespace, map[string]string{
"labelSelector": "app=istiod",
"fieldSelector": "status.phase=Running",
if err != nil {
return nil, err
if len(pods) == 0 {
return nil, fmt.Errorf("no running Istio pods in %q", namespace)
var errs error
res := version.MeshInfo{}
for _, pod := range pods {
component := pod.Labels["istio"]
server := version.ServerInfo{Component: component}
// :15014/version returns something like
// 1.7-alpha.9c900ba74d10a1affe7c23557ef0eebd6103b03c-9c900ba74d10a1affe7c23557ef0eebd6103b03c-Clean
result, err := c.CoreV1().Pods(pod.Namespace).ProxyGet("", pod.Name, "15014", "/version", nil).DoRaw(ctx)
if err != nil {
bi, execErr := c.getIstioVersionUsingExec(&pod)
if execErr != nil {
errs = multierror.Append(errs,
fmt.Errorf("error port-forwarding into %s.%s: %v", pod.Namespace, pod.Name, err),
server.Info = *bi
res = append(res, server)
if len(result) > 0 {
setServerInfoWithIstiodVersionInfo(&server.Info, string(result))
// (Golang version not available through :15014/version endpoint)
res = append(res, server)
return &res, errs
func (c *client) getIstioVersionUsingExec(pod *v1.Pod) (*version.BuildInfo, error) {
// exclude data plane components from control plane list
labelToPodDetail := map[string]struct {
binary string
container string
"pilot": {"/usr/local/bin/pilot-discovery", "discovery"},
"istiod": {"/usr/local/bin/pilot-discovery", "discovery"},
"citadel": {"/usr/local/bin/istio_ca", "citadel"},
"galley": {"/usr/local/bin/galley", "galley"},
"telemetry": {"/usr/local/bin/mixs", "mixer"},
"policy": {"/usr/local/bin/mixs", "mixer"},
"sidecar-injector": {"/usr/local/bin/sidecar-injector", "sidecar-injector-webhook"},
component := pod.Labels["istio"]
// Special cases
switch component {
case "statsd-prom-bridge":
// statsd-prom-bridge doesn't support version
return nil, fmt.Errorf("statsd-prom-bridge doesn't support version")
case "mixer":
component = pod.Labels["istio-mixer-type"]
detail, ok := labelToPodDetail[component]
if !ok {
return nil, fmt.Errorf("unknown Istio component %q", component)
stdout, stderr, err := c.PodExec(pod.Name, pod.Namespace, detail.container,
fmt.Sprintf("%s version -o json", detail.binary))
if err != nil {
return nil, fmt.Errorf("error exec'ing into %s %s container: %w", pod.Name, detail.container, err)
var v version.Version
err = json.Unmarshal([]byte(stdout), &v)
if err == nil && v.ClientVersion.Version != "" {
return v.ClientVersion, nil
return nil, fmt.Errorf("error reading %s %s container version: %v", pod.Name, detail.container, stderr)
func (c *client) NewPortForwarder(podName, ns, localAddress string, localPort int, podPort int) (PortForwarder, error) {
return newPortForwarder(c.config, podName, ns, localAddress, localPort, podPort)
func (c *client) PodsForSelector(ctx context.Context, namespace string, labelSelectors ...string) (*v1.PodList, error) {
return c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: strings.Join(labelSelectors, ","),
func (c *client) ApplyYAMLFiles(namespace string, yamlFiles ...string) error {
g, _ := errgroup.WithContext(context.TODO())
for _, f := range removeEmptyFiles(yamlFiles) {
f := f
g.Go(func() error {
return c.applyYAMLFile(namespace, false, f)
return g.Wait()
func (c *client) ApplyYAMLFilesDryRun(namespace string, yamlFiles ...string) error {
g, _ := errgroup.WithContext(context.TODO())
for _, f := range removeEmptyFiles(yamlFiles) {
f := f
g.Go(func() error {
return c.applyYAMLFile(namespace, true, f)
return g.Wait()
func (c *client) CreatePerRPCCredentials(_ context.Context, tokenNamespace, tokenServiceAccount string, audiences []string,
expirationSeconds int64) (credentials.PerRPCCredentials, error) {
return NewRPCCredentials(c, tokenNamespace, tokenServiceAccount, audiences, expirationSeconds, 60)
func (c *client) UtilFactory() util.Factory {
return c.clientFactory
// TODO once we drop Kubernetes 1.15 support we can drop all of this code in favor of Server Side Apply
// Following
func (c *client) applyYAMLFile(namespace string, dryRun bool, file string) error {
// Create the options.
streams, _, stdout, stderr := genericclioptions.NewTestIOStreams()
flags := apply.NewApplyFlags(c.clientFactory, streams)
flags.DeleteFlags.FileNameFlags.Filenames = &[]string{file}
cmd := apply.NewCmdApply("", c.clientFactory, streams)
opts, err := flags.ToOptions(cmd, "", nil)
if err != nil {
return err
opts.DynamicClient = c.dynamic
opts.DryRunVerifier = resource.NewQueryParamVerifier(c.dynamic, c.discoveryClient, resource.QueryParamDryRun)
opts.FieldValidationVerifier = resource.NewQueryParamVerifier(c.dynamic, c.clientFactory.OpenAPIGetter(), resource.QueryParamFieldValidation)
opts.FieldManager = fieldManager
if dryRun {
opts.DryRunStrategy = util.DryRunServer
// allow for a success message operation to be specified at print time
opts.ToPrinter = func(operation string) (printers.ResourcePrinter, error) {
opts.PrintFlags.NamePrintFlags.Operation = operation
util.PrintFlagsWithDryRunStrategy(opts.PrintFlags, opts.DryRunStrategy)
return opts.PrintFlags.ToPrinter()
if len(namespace) > 0 {
opts.Namespace = namespace
opts.EnforceNamespace = true
} else {
var err error
opts.Namespace, opts.EnforceNamespace, err = c.clientFactory.ToRawKubeConfigLoader().Namespace()
if err != nil {
return err
opts.DeleteOptions = &kubectlDelete.DeleteOptions{
DynamicClient: c.dynamic,
IOStreams: streams,
FilenameOptions: flags.DeleteFlags.FileNameFlags.ToOptions(),
opts.OpenAPISchema, _ = c.clientFactory.OpenAPISchema()
opts.Validator, err = c.clientFactory.Validator(metav1.FieldValidationStrict, opts.FieldValidationVerifier)
if err != nil {
return err
opts.Builder = c.clientFactory.NewBuilder()
opts.Mapper = c.mapper
opts.PostProcessorFn = opts.PrintAndPrunePostProcessor()
if err := opts.Run(); err != nil {
// Concatenate the stdout and stderr
s := stdout.String() + stderr.String()
return fmt.Errorf("%v: %s", err, s)
// If we are changing CRDs, invalidate the discovery client so future calls will not fail
if !dryRun {
f, _ := os.ReadFile(file)
if len(yml.SplitYamlByKind(string(f))[gvk.CustomResourceDefinition.Kind]) > 0 {
return nil
func (c *client) DeleteYAMLFiles(namespace string, yamlFiles ...string) (err error) {
yamlFiles = removeEmptyFiles(yamlFiles)
// Run each delete concurrently and collect the errors.
errs := make([]error, len(yamlFiles))
g, _ := errgroup.WithContext(context.TODO())
for i, f := range yamlFiles {
i, f := i, f
g.Go(func() error {
errs[i] = c.deleteFile(namespace, false, f)
return errs[i]
_ = g.Wait()
return multierror.Append(nil, errs...).ErrorOrNil()
func (c *client) DeleteYAMLFilesDryRun(namespace string, yamlFiles ...string) (err error) {
yamlFiles = removeEmptyFiles(yamlFiles)
// Run each delete concurrently and collect the errors.
errs := make([]error, len(yamlFiles))
g, _ := errgroup.WithContext(context.TODO())
for i, f := range yamlFiles {
i, f := i, f
g.Go(func() error {
errs[i] = c.deleteFile(namespace, true, f)
return errs[i]
_ = g.Wait()
return multierror.Append(nil, errs...).ErrorOrNil()
func (c *client) deleteFile(namespace string, dryRun bool, file string) error {
// Create the options.
streams, _, stdout, stderr := genericclioptions.NewTestIOStreams()
cmdNamespace, enforceNamespace, err := c.clientFactory.ToRawKubeConfigLoader().Namespace()
if err != nil {
return err
if len(namespace) > 0 {
cmdNamespace = namespace
enforceNamespace = true
fileOpts := resource.FilenameOptions{
Filenames: []string{file},
opts := kubectlDelete.DeleteOptions{
FilenameOptions: fileOpts,
CascadingStrategy: metav1.DeletePropagationBackground,
GracePeriod: -1,
IgnoreNotFound: true,
WaitForDeletion: true,
WarnClusterScope: enforceNamespace,
DynamicClient: c.dynamic,
DryRunVerifier: resource.NewQueryParamVerifier(c.dynamic, c.discoveryClient, resource.QueryParamDryRun),
IOStreams: streams,
if dryRun {
opts.DryRunStrategy = util.DryRunServer
r := c.clientFactory.NewBuilder().
FilenameParam(enforceNamespace, &fileOpts).
err = r.Err()
if err != nil {
return err
opts.Result = r
opts.Mapper = c.mapper
if err := opts.RunDelete(c.clientFactory); err != nil {
// Concatenate the stdout and stderr
s := stdout.String() + stderr.String()
return fmt.Errorf("%v: %s", err, s)
return nil
func closeQuietly(c io.Closer) {
_ = c.Close()
func removeEmptyFiles(files []string) []string {
out := make([]string, 0, len(files))
for _, f := range files {
if !isEmptyFile(f) {
out = append(out, f)
return out
func isEmptyFile(f string) bool {
fileInfo, err := os.Stat(f)
if err != nil {
return true
if fileInfo.Size() == 0 {
return true
return false
// IstioScheme returns a scheme will all known Istio-related types added
var IstioScheme = istioScheme()
// FakeIstioScheme is an IstioScheme that has List type registered.
var FakeIstioScheme = func() *runtime.Scheme {
s := istioScheme()
// Workaround
s.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-metadata-client-group", Version: "v1", Kind: "List"}, &metav1.List{})
return s
func istioScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
return scheme
func setServerInfoWithIstiodVersionInfo(serverInfo *version.BuildInfo, istioInfo string) {
versionParts := strings.Split(istioInfo, "-")
nParts := len(versionParts)
if nParts >= 3 {
// The format will be like 1.12.0-016bc46f4a5e0ef3fa135b3c5380ab7765467c1a-dirty-Modified
// version is '1.12.0'
// revision is '016bc46f4a5e0ef3fa135b3c5380ab7765467c1a-dirty'
// status is 'Modified'
serverInfo.Version = versionParts[0]
serverInfo.GitTag = serverInfo.Version
serverInfo.GitRevision = strings.Join(versionParts[1:nParts-1], "-")
serverInfo.BuildStatus = versionParts[nParts-1]
} else {
serverInfo.Version = istioInfo