| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package kube |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "time" |
| ) |
| |
| import ( |
| "github.com/hashicorp/go-multierror" |
| kubeApiCore "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| kubeApiMeta "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/client-go/kubernetes" |
| ) |
| |
| import ( |
| istioKube "github.com/apache/dubbo-go-pixiu/pkg/kube" |
| "github.com/apache/dubbo-go-pixiu/pkg/test" |
| "github.com/apache/dubbo-go-pixiu/pkg/test/scopes" |
| "github.com/apache/dubbo-go-pixiu/pkg/test/util/retry" |
| ) |
| |
| var ( |
| defaultRetryTimeout = retry.Timeout(time.Minute * 10) |
| defaultRetryDelay = retry.BackoffDelay(time.Millisecond * 200) |
| ) |
| |
| // PodFetchFunc fetches pods from a k8s Client. |
| type PodFetchFunc func() ([]kubeApiCore.Pod, error) |
| |
| // NewPodFetch creates a new PodFetchFunction that fetches all pods matching the namespace and label selectors. |
| func NewPodFetch(a istioKube.ExtendedClient, namespace string, selectors ...string) PodFetchFunc { |
| return func() ([]kubeApiCore.Pod, error) { |
| pods, err := a.PodsForSelector(context.TODO(), namespace, selectors...) |
| if err != nil { |
| return nil, err |
| } |
| return pods.Items, nil |
| } |
| } |
| |
| // NewSinglePodFetch creates a new PodFetchFunction that fetches a single pod matching the given label selectors. |
| func NewSinglePodFetch(a istioKube.ExtendedClient, namespace string, selectors ...string) PodFetchFunc { |
| return func() ([]kubeApiCore.Pod, error) { |
| pods, err := a.PodsForSelector(context.TODO(), namespace, selectors...) |
| if err != nil { |
| return nil, err |
| } |
| |
| if len(pods.Items) == 0 { |
| return nil, fmt.Errorf("no matching pod found for selectors: %v", selectors) |
| } |
| |
| if len(pods.Items) > 1 { |
| scopes.Framework.Warnf("More than one pod found matching selectors: %v", selectors) |
| } |
| |
| return []kubeApiCore.Pod{pods.Items[0]}, nil |
| } |
| } |
| |
| // NewPodMustFetch creates a new PodFetchFunction that fetches all pods matching the namespace and label selectors. |
| // If no pods are found, an error is returned |
| func NewPodMustFetch(a istioKube.ExtendedClient, namespace string, selectors ...string) PodFetchFunc { |
| return func() ([]kubeApiCore.Pod, error) { |
| pods, err := a.PodsForSelector(context.TODO(), namespace, selectors...) |
| if err != nil { |
| return nil, err |
| } |
| if len(pods.Items) == 0 { |
| return nil, fmt.Errorf("no pods found for %v", selectors) |
| } |
| return pods.Items, nil |
| } |
| } |
| |
| // CheckPodsAreReady checks whether the pods that are selected by the given function is in ready state or not. |
| func CheckPodsAreReady(fetchFunc PodFetchFunc) ([]kubeApiCore.Pod, error) { |
| scopes.Framework.Infof("Checking pods ready...") |
| |
| fetched, err := fetchFunc() |
| if err != nil { |
| scopes.Framework.Infof("Failed retrieving pods: %v", err) |
| return nil, err |
| } |
| |
| if len(fetched) == 0 { |
| scopes.Framework.Infof("No pods found...") |
| return nil, fmt.Errorf("no pods fetched") |
| } |
| |
| for i, p := range fetched { |
| msg := "Ready" |
| if e := istioKube.CheckPodReadyOrComplete(&p); e != nil { |
| msg = e.Error() |
| err = multierror.Append(err, fmt.Errorf("%s/%s: %s", p.Namespace, p.Name, msg)) |
| } |
| scopes.Framework.Infof(" [%2d] %45s %15s (%v)", i, p.Name, p.Status.Phase, msg) |
| } |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| return fetched, nil |
| } |
| |
| // DeleteOptionsForeground creates new delete options that will block until the operation completes. |
| func DeleteOptionsForeground() kubeApiMeta.DeleteOptions { |
| propagationPolicy := kubeApiMeta.DeletePropagationForeground |
| gracePeriod := int64(0) |
| return kubeApiMeta.DeleteOptions{ |
| PropagationPolicy: &propagationPolicy, |
| GracePeriodSeconds: &gracePeriod, |
| } |
| } |
| |
| // WaitUntilPodsAreReady waits until the pod with the name/namespace is in ready state. |
| func WaitUntilPodsAreReady(fetchFunc PodFetchFunc, opts ...retry.Option) ([]kubeApiCore.Pod, error) { |
| var pods []kubeApiCore.Pod |
| err := retry.UntilSuccess(func() error { |
| scopes.Framework.Infof("Checking pods ready...") |
| |
| fetched, err := CheckPodsAreReady(fetchFunc) |
| if err != nil { |
| return err |
| } |
| pods = fetched |
| return nil |
| }, newRetryOptions(opts...)...) |
| |
| return pods, err |
| } |
| |
| // WaitUntilServiceEndpointsAreReady will wait until the service with the given name/namespace is present, and have at least |
| // one usable endpoint. |
| func WaitUntilServiceEndpointsAreReady(a kubernetes.Interface, ns string, name string, |
| opts ...retry.Option) (*kubeApiCore.Service, *kubeApiCore.Endpoints, error) { |
| var service *kubeApiCore.Service |
| var endpoints *kubeApiCore.Endpoints |
| err := retry.UntilSuccess(func() error { |
| s, err := a.CoreV1().Services(ns).Get(context.TODO(), name, kubeApiMeta.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| |
| eps, err := a.CoreV1().Endpoints(ns).Get(context.TODO(), name, kubeApiMeta.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| if len(eps.Subsets) == 0 { |
| return fmt.Errorf("%s/%v endpoint not ready: no subsets", ns, name) |
| } |
| |
| for _, subset := range eps.Subsets { |
| if len(subset.Addresses) > 0 && len(subset.NotReadyAddresses) == 0 { |
| service = s |
| endpoints = eps |
| return nil |
| } |
| } |
| return fmt.Errorf("%s/%v endpoint not ready: no ready addresses", ns, name) |
| }, newRetryOptions(opts...)...) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| return service, endpoints, nil |
| } |
| |
| // WaitForSecretToExist waits for the given secret up to the given waitTime. |
| func WaitForSecretToExist(a kubernetes.Interface, namespace, name string, waitTime time.Duration) (*kubeApiCore.Secret, error) { |
| secret := a.CoreV1().Secrets(namespace) |
| |
| watch, err := secret.Watch(context.TODO(), kubeApiMeta.ListOptions{}) |
| if err != nil { |
| return nil, fmt.Errorf("failed to set up watch for secret (error: %v)", err) |
| } |
| events := watch.ResultChan() |
| |
| startTime := time.Now() |
| for { |
| select { |
| case event := <-events: |
| secret := event.Object.(*kubeApiCore.Secret) |
| if secret.GetName() == name { |
| return secret, nil |
| } |
| case <-time.After(waitTime - time.Since(startTime)): |
| return nil, fmt.Errorf("secret %v did not become existent within %v", |
| name, waitTime) |
| } |
| } |
| } |
| |
| // WaitForSecretToExistOrFail calls WaitForSecretToExist and fails the given test.Failer if an error occurs. |
| func WaitForSecretToExistOrFail(t test.Failer, a kubernetes.Interface, namespace, name string, |
| waitTime time.Duration) *kubeApiCore.Secret { |
| t.Helper() |
| s, err := WaitForSecretToExist(a, namespace, name, waitTime) |
| if err != nil { |
| t.Fatal(err) |
| } |
| return s |
| } |
| |
| // WaitForNamespaceDeletion waits until a namespace is deleted. |
| func WaitForNamespaceDeletion(a kubernetes.Interface, ns string, opts ...retry.Option) error { |
| return retry.UntilSuccess(func() error { |
| _, err := a.CoreV1().Namespaces().Get(context.TODO(), ns, kubeApiMeta.GetOptions{}) |
| if err == nil { |
| return fmt.Errorf("namespace %v still exists", ns) |
| } |
| |
| if errors.IsNotFound(err) { |
| return nil |
| } |
| |
| return err |
| }, newRetryOptions(opts...)...) |
| } |
| |
| // NamespaceExists returns true if the given namespace exists. |
| func NamespaceExists(a kubernetes.Interface, ns string) bool { |
| allNs, err := a.CoreV1().Namespaces().List(context.TODO(), kubeApiMeta.ListOptions{}) |
| if err != nil { |
| return false |
| } |
| for _, n := range allNs.Items { |
| if n.Name == ns { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func newRetryOptions(opts ...retry.Option) []retry.Option { |
| out := make([]retry.Option, 0, 2+len(opts)) |
| out = append(out, defaultRetryTimeout, defaultRetryDelay) |
| out = append(out, opts...) |
| return out |
| } |
| |
| // MutatingWebhookConfigurationsExists returns true if all the given mutating webhook configs exist. |
| func MutatingWebhookConfigurationsExists(a kubernetes.Interface, names []string) bool { |
| cfgs, err := a.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), kubeApiMeta.ListOptions{}) |
| if err != nil { |
| return false |
| } |
| |
| if len(cfgs.Items) != len(names) { |
| return false |
| } |
| |
| sort.Strings(names) |
| for _, cfg := range cfgs.Items { |
| if idx := sort.SearchStrings(names, cfg.Name); idx == len(names) { |
| return false |
| } |
| } |
| |
| return true |
| } |
| |
| // ValidatingWebhookConfigurationsExists returns true if all the given validating webhook configs exist. |
| func ValidatingWebhookConfigurationsExists(a kubernetes.Interface, names []string) bool { |
| cfgs, err := a.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), kubeApiMeta.ListOptions{}) |
| if err != nil { |
| return false |
| } |
| |
| if len(cfgs.Items) != len(names) { |
| return false |
| } |
| |
| sort.Strings(names) |
| for _, cfg := range cfgs.Items { |
| if idx := sort.SearchStrings(names, cfg.Name); idx == len(names) { |
| return false |
| } |
| } |
| |
| return true |
| } |