blob: 3fb7159832652981047a1844255b445317dc7726 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package framework
import (
"bufio"
"bytes"
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/api7/gopkg/pkg/log"
"github.com/gavv/httpexpect/v2"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/gruntwork-io/terratest/modules/testing"
. "github.com/onsi/gomega" //nolint:staticcheck
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/utils/ptr"
)
// buildRestConfig builds the rest.Config object from kubeconfig filepath and
// context, if kubeconfig is missing, building from in-cluster configuration.
func buildRestConfig(context string) (*rest.Config, error) {
// Config loading rules:
// 1. kubeconfig if it not empty string
// 2. Config(s) in KUBECONFIG environment variable
// 3. In cluster config if running in-cluster
// 4. Use $HOME/.kube/config
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
configOverrides := &clientcmd.ConfigOverrides{
ClusterDefaults: clientcmd.ClusterDefaults,
CurrentContext: context,
}
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
return clientConfig.ClientConfig()
}
func (f *Framework) ensureService(name, namespace string, desiredEndpoints int) error {
return f.ensureServiceWithTimeout(name, namespace, desiredEndpoints, 120)
}
func (f *Framework) ensureServiceWithTimeout(name, namespace string, desiredEndpoints, timeout int) error {
backoff := wait.Backoff{
Duration: 6 * time.Second,
Factor: 1,
Steps: timeout / 6,
}
var lastErr error
condFunc := func() (bool, error) {
ep, err := f.clientset.CoreV1().Endpoints(namespace).Get(f.Context, name, metav1.GetOptions{})
if err != nil {
lastErr = err
log.Errorw("failed to list endpoints",
zap.String("service", name),
zap.Error(err),
)
return false, nil
}
count := 0
for _, ss := range ep.Subsets {
count += len(ss.Addresses)
}
if count == desiredEndpoints {
return true, nil
}
log.Infow("endpoints count mismatch",
zap.String("service", name),
zap.Any("ep", ep),
zap.Int("expected", desiredEndpoints),
zap.Int("actual", count),
)
lastErr = fmt.Errorf("expected endpoints: %d but seen %d", desiredEndpoints, count)
return false, nil
}
err := wait.ExponentialBackoff(backoff, condFunc)
if err != nil {
return lastErr
}
return nil
}
func (f *Framework) GetServiceEndpoints(nn types.NamespacedName) ([]string, error) {
ep, err := f.clientset.CoreV1().Endpoints(cmp.Or(nn.Namespace, _namespace)).Get(f.Context, nn.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
var endpoints []string
for _, ss := range ep.Subsets {
for _, addr := range ss.Addresses {
endpoints = append(endpoints, addr.IP)
}
}
return endpoints, nil
}
//nolint:unused
func (f *Framework) deletePods(selector string) {
podList, err := f.clientset.CoreV1().Pods(_namespace).List(f.Context, metav1.ListOptions{
LabelSelector: selector,
})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "list pods")
for _, pod := range podList.Items {
_ = f.clientset.CoreV1().
Pods(_namespace).
Delete(f.Context, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(30))})
}
}
func (f *Framework) CreateNamespaceWithTestService(name string) {
_, err := f.clientset.CoreV1().
Namespaces().
Create(f.Context, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "create namespace")
return
}
_, err = f.clientset.CoreV1().Services(name).Create(f.Context, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: name,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
},
},
Selector: map[string]string{
"app": "httpbin",
},
Type: corev1.ServiceTypeClusterIP,
},
}, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "create service")
}
}
func (f *Framework) DeleteNamespace(name string) {
err := f.clientset.CoreV1().Namespaces().Delete(f.Context, name, metav1.DeleteOptions{})
if err == nil || errors.IsNotFound(err) {
return
}
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "delete namespace")
}
func (f *Framework) Scale(name string, replicas int32) {
scale, err := f.clientset.AppsV1().Deployments(_namespace).GetScale(context.Background(), name, metav1.GetOptions{})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("get deployment %s scale failed", name))
if scale.Spec.Replicas == replicas {
return
}
scale.Spec.Replicas = replicas
_, err = f.clientset.AppsV1().
Deployments(_namespace).
UpdateScale(context.Background(), name, scale, metav1.UpdateOptions{})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("scale deployment %s to %v failed", name, replicas))
// FIXME: The service name and the deployment name may not be the same
err = f.ensureService(name, _namespace, int(replicas))
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(),
fmt.Sprintf("ensure service %s/%s has %v endpoints failed", _namespace, name, replicas))
}
func (f *Framework) GetPodIP(namespace, selector string) string {
pods := f.GetPods(namespace, selector)
f.GomegaT.Expect(pods).ShouldNot(BeEmpty())
return pods[0].Status.PodIP
}
func (f *Framework) GetPods(namespace, selector string) []corev1.Pod {
podList, err := f.clientset.CoreV1().Pods(cmp.Or(namespace, _namespace)).List(f.Context, metav1.ListOptions{
LabelSelector: selector,
})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred())
return podList.Items
}
//nolint:unused
func (f *Framework) applySSLSecret(namespace, name string, cert, pkey, caCert []byte) {
kind := "Secret"
apiVersion := "v1"
secretType := corev1.SecretTypeTLS
secret := applycorev1.SecretApplyConfiguration{
TypeMetaApplyConfiguration: applymetav1.TypeMetaApplyConfiguration{
Kind: &kind,
APIVersion: &apiVersion,
},
ObjectMetaApplyConfiguration: &applymetav1.ObjectMetaApplyConfiguration{
Name: &name,
},
Data: map[string][]byte{
"tls.crt": cert,
"tls.key": pkey,
"ca.crt": caCert,
},
Type: &secretType,
}
cli, err := k8s.GetKubernetesClientE(f.GinkgoT)
Expect(err).ToNot(HaveOccurred())
_, err = cli.CoreV1().Secrets(namespace).Apply(context.TODO(), &secret, metav1.ApplyOptions{
FieldManager: "e2e",
})
Expect(err).ToNot(HaveOccurred(), "apply secret")
}
func WaitPodsAvailable(t testing.TestingT, kubeOps *k8s.KubectlOptions, opts metav1.ListOptions) error {
condFunc := func() (bool, error) {
items, err := k8s.ListPodsE(t, kubeOps, opts)
if err != nil {
return false, err
}
if len(items) == 0 {
return false, nil
}
for _, item := range items {
foundPodReady := false
for _, cond := range item.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
foundPodReady = true
if cond.Status != "True" {
return false, nil
}
}
if !foundPodReady {
return false, nil
}
}
return true, nil
}
return waitExponentialBackoff(condFunc)
}
func waitExponentialBackoff(condFunc func() (bool, error)) error {
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 2,
Steps: 8,
}
return wait.ExponentialBackoff(backoff, condFunc)
}
func (f *Framework) NewExpectResponse(httpBody any) *httpexpect.Response {
body, err := json.Marshal(httpBody)
f.GomegaT.Expect(err).ShouldNot(HaveOccurred())
return httpexpect.NewResponse(f.GinkgoT, &http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: io.NopCloser(bytes.NewBuffer(body)),
})
}
// ListPods query pods by label selector.
func (f *Framework) ListPods(selector string) []corev1.Pod {
pods, err := f.clientset.CoreV1().Pods(_namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "list pod: ", selector)
return pods.Items
}
func (f *Framework) ListRunningPods(selector string) []corev1.Pod {
pods, err := f.clientset.CoreV1().Pods(_namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "list pod: ", selector)
runningPods := make([]corev1.Pod, 0)
for _, p := range pods.Items {
if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil {
runningPods = append(runningPods, p)
}
}
return runningPods
}
// ExecCommandInPod exec cmd in specify pod and return the output from stdout and stderr
func (f *Framework) ExecCommandInPod(podName string, cmd ...string) (string, string) {
req := f.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(_namespace).SubResource("exec")
req.VersionedParams(
&corev1.PodExecOptions{
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
},
scheme.ParameterCodec,
)
var stdout, stderr bytes.Buffer
exec, err := remotecommand.NewSPDYExecutor(f.restConfig, "POST", req.URL())
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "request kubernetes exec api")
_ = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
})
return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String())
}
func (f *Framework) GetPodLogs(name string, previous bool) string {
reader, err := f.clientset.CoreV1().
Pods(_namespace).
GetLogs(name, &corev1.PodLogOptions{Previous: previous}).
Stream(context.Background())
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "get logs")
defer func() {
_ = reader.Close()
}()
logs, err := io.ReadAll(reader)
f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "read all logs")
return string(logs)
}
func (f *Framework) WaitControllerManagerLog(keyword string, sinceSeconds int64, timeout time.Duration) {
f.WaitPodsLog("control-plane=controller-manager", keyword, sinceSeconds, timeout)
}
func (f *Framework) WaitPodsLog(selector, keyword string, sinceSeconds int64, timeout time.Duration) {
pods := f.ListRunningPods(selector)
wg := sync.WaitGroup{}
for _, p := range pods {
wg.Add(1)
go func(p corev1.Pod) {
defer wg.Done()
opts := corev1.PodLogOptions{Follow: true}
if sinceSeconds > 0 {
opts.SinceSeconds = ptr.To(sinceSeconds)
} else {
opts.TailLines = ptr.To(int64(0))
}
logStream, err := f.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &opts).Stream(context.Background())
f.GomegaT.Expect(err).Should(BeNil())
scanner := bufio.NewScanner(logStream)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, keyword) {
return
}
}
}(p)
}
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(timeout):
f.GinkgoT.Error("wait log timeout")
}
}