blob: 8bd4c931b5bf2dcc57b27ba8fa5f7ec4e012b091 [file] [log] [blame]
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package framework contains provider-independent helper code for
// building and running E2E tests with Ginkgo. The actual Ginkgo test
// suites gets assembled by combining this framework, the optional
// provider support code and specific tests via a separate .go file
// like Kubernetes' test/e2e.go.
package framework
import (
"bufio"
"bytes"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"
"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale"
csi "k8s.io/csi-api/pkg/client/clientset/versioned"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/test/e2e/framework/metrics"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
maxKubectlExecRetries = 5
// TODO(mikedanese): reset this to 5 minutes once #47135 is resolved.
// ref https://github.com/kubernetes/kubernetes/issues/47135
DefaultNamespaceDeletionTimeout = 10 * time.Minute
)
// Framework supports common operations used by e2e tests; it will keep a client & a namespace for you.
// Eventual goal is to merge this with integration test framework.
type Framework struct {
BaseName string
// Set together with creating the ClientSet and the namespace.
// Guaranteed to be unique in the cluster even when running the same
// test multiple times in parallel.
UniqueName string
ClientSet clientset.Interface
KubemarkExternalClusterClientSet clientset.Interface
APIExtensionsClientSet apiextensionsclient.Interface
CSIClientSet csi.Interface
InternalClientset *internalclientset.Clientset
AggregatorClient *aggregatorclient.Clientset
DynamicClient dynamic.Interface
ScalesGetter scaleclient.ScalesGetter
SkipNamespaceCreation bool // Whether to skip creating a namespace
Namespace *v1.Namespace // Every test has at least one namespace unless creation is skipped
namespacesToDelete []*v1.Namespace // Some tests have more than one.
NamespaceDeletionTimeout time.Duration
SkipPrivilegedPSPBinding bool // Whether to skip creating a binding to the privileged PSP in the test namespace
gatherer *ContainerResourceGatherer
// Constraints that passed to a check which is executed after data is gathered to
// see if 99% of results are within acceptable bounds. It has to be injected in the test,
// as expectations vary greatly. Constraints are grouped by the container names.
AddonResourceConstraints map[string]ResourceConstraint
logsSizeWaitGroup sync.WaitGroup
logsSizeCloseChannel chan bool
logsSizeVerifier *LogsSizeVerifier
// Flaky operation failures in an e2e test can be captured through this.
flakeReport *FlakeReport
// To make sure that this framework cleans up after itself, no matter what,
// we install a Cleanup action before each test and clear it after. If we
// should abort, the AfterSuite hook should run all Cleanup actions.
cleanupHandle CleanupActionHandle
// configuration for framework's client
Options FrameworkOptions
// Place where various additional data is stored during test run to be printed to ReportDir,
// or stdout if ReportDir is not set once test ends.
TestSummaries []TestDataSummary
// Place to keep ClusterAutoscaler metrics from before test in order to compute delta.
clusterAutoscalerMetricsBeforeTest metrics.MetricsCollection
}
type TestDataSummary interface {
SummaryKind() string
PrintHumanReadable() string
PrintJSON() string
}
type FrameworkOptions struct {
ClientQPS float32
ClientBurst int
GroupVersion *schema.GroupVersion
}
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
// you (you can write additional before/after each functions).
func NewDefaultFramework(baseName string) *Framework {
options := FrameworkOptions{
ClientQPS: 20,
ClientBurst: 50,
}
return NewFramework(baseName, options, nil)
}
func NewFramework(baseName string, options FrameworkOptions, client clientset.Interface) *Framework {
f := &Framework{
BaseName: baseName,
AddonResourceConstraints: make(map[string]ResourceConstraint),
Options: options,
ClientSet: client,
}
BeforeEach(f.BeforeEach)
AfterEach(f.AfterEach)
return f
}
// BeforeEach gets a client and makes a namespace.
func (f *Framework) BeforeEach() {
// The fact that we need this feels like a bug in ginkgo.
// https://github.com/onsi/ginkgo/issues/222
f.cleanupHandle = AddCleanupAction(f.AfterEach)
if f.ClientSet == nil {
By("Creating a kubernetes client")
config, err := LoadConfig()
testDesc := CurrentGinkgoTestDescription()
if len(testDesc.ComponentTexts) > 0 {
componentTexts := strings.Join(testDesc.ComponentTexts, " ")
config.UserAgent = fmt.Sprintf(
"%v -- %v",
rest.DefaultKubernetesUserAgent(),
componentTexts)
}
Expect(err).NotTo(HaveOccurred())
config.QPS = f.Options.ClientQPS
config.Burst = f.Options.ClientBurst
if f.Options.GroupVersion != nil {
config.GroupVersion = f.Options.GroupVersion
}
if TestContext.KubeAPIContentType != "" {
config.ContentType = TestContext.KubeAPIContentType
}
f.ClientSet, err = clientset.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
f.APIExtensionsClientSet, err = apiextensionsclient.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
f.InternalClientset, err = internalclientset.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
f.AggregatorClient, err = aggregatorclient.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
f.DynamicClient, err = dynamic.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
// csi.storage.k8s.io is based on CRD, which is served only as JSON
jsonConfig := config
jsonConfig.ContentType = "application/json"
f.CSIClientSet, err = csi.NewForConfig(jsonConfig)
Expect(err).NotTo(HaveOccurred())
// create scales getter, set GroupVersion and NegotiatedSerializer to default values
// as they are required when creating a REST client.
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if config.NegotiatedSerializer == nil {
config.NegotiatedSerializer = legacyscheme.Codecs
}
restClient, err := rest.RESTClientFor(config)
Expect(err).NotTo(HaveOccurred())
discoClient, err := discovery.NewDiscoveryClientForConfig(config)
Expect(err).NotTo(HaveOccurred())
cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient)
restMapper.Reset()
resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient)
f.ScalesGetter = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver)
TestContext.CloudConfig.Provider.FrameworkBeforeEach(f)
}
if !f.SkipNamespaceCreation {
By(fmt.Sprintf("Building a namespace api object, basename %s", f.BaseName))
namespace, err := f.CreateNamespace(f.BaseName, map[string]string{
"e2e-framework": f.BaseName,
})
Expect(err).NotTo(HaveOccurred())
f.Namespace = namespace
if TestContext.VerifyServiceAccount {
By("Waiting for a default service account to be provisioned in namespace")
err = WaitForDefaultServiceAccountInNamespace(f.ClientSet, namespace.Name)
Expect(err).NotTo(HaveOccurred())
} else {
Logf("Skipping waiting for service account")
}
f.UniqueName = f.Namespace.GetName()
} else {
// not guaranteed to be unique, but very likely
f.UniqueName = fmt.Sprintf("%s-%08x", f.BaseName, rand.Int31())
}
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" {
var err error
var nodeMode NodesSet
switch TestContext.GatherKubeSystemResourceUsageData {
case "master":
nodeMode = MasterNodes
case "masteranddns":
nodeMode = MasterAndDNSNodes
default:
nodeMode = AllNodes
}
f.gatherer, err = NewResourceUsageGatherer(f.ClientSet, ResourceGathererOptions{
InKubemark: ProviderIs("kubemark"),
Nodes: nodeMode,
ResourceDataGatheringPeriod: 60 * time.Second,
ProbeDuration: 15 * time.Second,
PrintVerboseLogs: false,
}, nil)
if err != nil {
Logf("Error while creating NewResourceUsageGatherer: %v", err)
} else {
go f.gatherer.StartGatheringData()
}
}
if TestContext.GatherLogsSizes {
f.logsSizeWaitGroup = sync.WaitGroup{}
f.logsSizeWaitGroup.Add(1)
f.logsSizeCloseChannel = make(chan bool)
f.logsSizeVerifier = NewLogsVerifier(f.ClientSet, f.logsSizeCloseChannel)
go func() {
f.logsSizeVerifier.Run()
f.logsSizeWaitGroup.Done()
}()
}
gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else {
f.clusterAutoscalerMetricsBeforeTest, err = grabber.Grab()
if err != nil {
Logf("MetricsGrabber failed to grab CA metrics before test (skipping metrics gathering): %v", err)
} else {
Logf("Gathered ClusterAutoscaler metrics before test")
}
}
}
f.flakeReport = NewFlakeReport()
}
// AfterEach deletes the namespace, after reading its events.
func (f *Framework) AfterEach() {
RemoveCleanupAction(f.cleanupHandle)
// DeleteNamespace at the very end in defer, to avoid any
// expectation failures preventing deleting the namespace.
defer func() {
nsDeletionErrors := map[string]error{}
// Whether to delete namespace is determined by 3 factors: delete-namespace flag, delete-namespace-on-failure flag and the test result
// if delete-namespace set to false, namespace will always be preserved.
// if delete-namespace is true and delete-namespace-on-failure is false, namespace will be preserved if test failed.
if TestContext.DeleteNamespace && (TestContext.DeleteNamespaceOnFailure || !CurrentGinkgoTestDescription().Failed) {
for _, ns := range f.namespacesToDelete {
By(fmt.Sprintf("Destroying namespace %q for this suite.", ns.Name))
timeout := DefaultNamespaceDeletionTimeout
if f.NamespaceDeletionTimeout != 0 {
timeout = f.NamespaceDeletionTimeout
}
if err := deleteNS(f.ClientSet, f.DynamicClient, ns.Name, timeout); err != nil {
if !apierrors.IsNotFound(err) {
nsDeletionErrors[ns.Name] = err
} else {
Logf("Namespace %v was already deleted", ns.Name)
}
}
}
} else {
if !TestContext.DeleteNamespace {
Logf("Found DeleteNamespace=false, skipping namespace deletion!")
} else {
Logf("Found DeleteNamespaceOnFailure=false and current test failed, skipping namespace deletion!")
}
}
// Paranoia-- prevent reuse!
f.Namespace = nil
f.ClientSet = nil
f.namespacesToDelete = nil
// if we had errors deleting, report them now.
if len(nsDeletionErrors) != 0 {
messages := []string{}
for namespaceKey, namespaceErr := range nsDeletionErrors {
messages = append(messages, fmt.Sprintf("Couldn't delete ns: %q: %s (%#v)", namespaceKey, namespaceErr, namespaceErr))
}
Failf(strings.Join(messages, ","))
}
}()
// Print events if the test failed.
if CurrentGinkgoTestDescription().Failed && TestContext.DumpLogsOnFailure {
// Pass both unversioned client and versioned clientset, till we have removed all uses of the unversioned client.
if !f.SkipNamespaceCreation {
DumpAllNamespaceInfo(f.ClientSet, f.Namespace.Name)
}
}
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" && f.gatherer != nil {
By("Collecting resource usage data")
summary, resourceViolationError := f.gatherer.StopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints)
defer ExpectNoError(resourceViolationError)
f.TestSummaries = append(f.TestSummaries, summary)
}
if TestContext.GatherLogsSizes {
By("Gathering log sizes data")
close(f.logsSizeCloseChannel)
f.logsSizeWaitGroup.Wait()
f.TestSummaries = append(f.TestSummaries, f.logsSizeVerifier.GetSummary())
}
if TestContext.GatherMetricsAfterTest != "false" {
By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else {
received, err := grabber.Grab()
if err != nil {
Logf("MetricsGrabber failed to grab some of the metrics: %v", err)
}
(*MetricsForE2E)(&received).computeClusterAutoscalerMetricsDelta(f.clusterAutoscalerMetricsBeforeTest)
f.TestSummaries = append(f.TestSummaries, (*MetricsForE2E)(&received))
}
}
TestContext.CloudConfig.Provider.FrameworkAfterEach(f)
// Report any flakes that were observed in the e2e test and reset.
if f.flakeReport != nil && f.flakeReport.GetFlakeCount() > 0 {
f.TestSummaries = append(f.TestSummaries, f.flakeReport)
f.flakeReport = nil
}
PrintSummaries(f.TestSummaries, f.BaseName)
// Check whether all nodes are ready after the test.
// This is explicitly done at the very end of the test, to avoid
// e.g. not removing namespace in case of this failure.
if err := AllNodesReady(f.ClientSet, 3*time.Minute); err != nil {
Failf("All nodes should be ready after test, %v", err)
}
}
func (f *Framework) CreateNamespace(baseName string, labels map[string]string) (*v1.Namespace, error) {
createTestingNS := TestContext.CreateTestingNS
if createTestingNS == nil {
createTestingNS = CreateTestingNS
}
ns, err := createTestingNS(baseName, f.ClientSet, labels)
// check ns instead of err to see if it's nil as we may
// fail to create serviceAccount in it.
f.AddNamespacesToDelete(ns)
if err == nil && !f.SkipPrivilegedPSPBinding {
CreatePrivilegedPSPBinding(f, ns.Name)
}
return ns, err
}
func (f *Framework) RecordFlakeIfError(err error, optionalDescription ...interface{}) {
f.flakeReport.RecordFlakeIfError(err, optionalDescription)
}
// AddNamespacesToDelete adds one or more namespaces to be deleted when the test
// completes.
func (f *Framework) AddNamespacesToDelete(namespaces ...*v1.Namespace) {
for _, ns := range namespaces {
if ns == nil {
continue
}
f.namespacesToDelete = append(f.namespacesToDelete, ns)
}
}
// WaitForPodTerminated waits for the pod to be terminated with the given reason.
func (f *Framework) WaitForPodTerminated(podName, reason string) error {
return waitForPodTerminatedInNamespace(f.ClientSet, podName, reason, f.Namespace.Name)
}
// WaitForPodNotFound waits for the pod to be completely terminated (not "Get-able").
func (f *Framework) WaitForPodNotFound(podName string, timeout time.Duration) error {
return waitForPodNotFoundInNamespace(f.ClientSet, podName, f.Namespace.Name, timeout)
}
// WaitForPodRunning waits for the pod to run in the namespace.
func (f *Framework) WaitForPodRunning(podName string) error {
return WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
}
// WaitForPodReady waits for the pod to flip to ready in the namespace.
func (f *Framework) WaitForPodReady(podName string) error {
return waitTimeoutForPodReadyInNamespace(f.ClientSet, podName, f.Namespace.Name, PodStartTimeout)
}
// WaitForPodRunningSlow waits for the pod to run in the namespace.
// It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout).
func (f *Framework) WaitForPodRunningSlow(podName string) error {
return waitForPodRunningInNamespaceSlow(f.ClientSet, podName, f.Namespace.Name)
}
// WaitForPodNoLongerRunning waits for the pod to no longer be running in the namespace, for either
// success or failure.
func (f *Framework) WaitForPodNoLongerRunning(podName string) error {
return WaitForPodNoLongerRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
}
// TestContainerOutput runs the given pod in the given namespace and waits
// for all of the containers in the podSpec to move into the 'Success' status, and tests
// the specified container log against the given expected output using a substring matcher.
func (f *Framework) TestContainerOutput(scenarioName string, pod *v1.Pod, containerIndex int, expectedOutput []string) {
f.testContainerOutputMatcher(scenarioName, pod, containerIndex, expectedOutput, ContainSubstring)
}
// TestContainerOutputRegexp runs the given pod in the given namespace and waits
// for all of the containers in the podSpec to move into the 'Success' status, and tests
// the specified container log against the given expected output using a regexp matcher.
func (f *Framework) TestContainerOutputRegexp(scenarioName string, pod *v1.Pod, containerIndex int, expectedOutput []string) {
f.testContainerOutputMatcher(scenarioName, pod, containerIndex, expectedOutput, MatchRegexp)
}
// Write a file using kubectl exec echo <contents> > <path> via specified container
// Because of the primitive technique we're using here, we only allow ASCII alphanumeric characters
func (f *Framework) WriteFileViaContainer(podName, containerName string, path string, contents string) error {
By("writing a file in the container")
allowedCharacters := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
for _, c := range contents {
if !strings.ContainsRune(allowedCharacters, c) {
return fmt.Errorf("Unsupported character in string to write: %v", c)
}
}
command := fmt.Sprintf("echo '%s' > '%s'", contents, path)
stdout, stderr, err := kubectlExecWithRetry(f.Namespace.Name, podName, containerName, "--", "/bin/sh", "-c", command)
if err != nil {
Logf("error running kubectl exec to write file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return err
}
// Read a file using kubectl exec cat <path>
func (f *Framework) ReadFileViaContainer(podName, containerName string, path string) (string, error) {
By("reading a file in the container")
stdout, stderr, err := kubectlExecWithRetry(f.Namespace.Name, podName, containerName, "--", "cat", path)
if err != nil {
Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return string(stdout), err
}
func (f *Framework) CheckFileSizeViaContainer(podName, containerName, path string) (string, error) {
By("checking a file size in the container")
stdout, stderr, err := kubectlExecWithRetry(f.Namespace.Name, podName, containerName, "--", "ls", "-l", path)
if err != nil {
Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return string(stdout), err
}
// CreateServiceForSimpleAppWithPods is a convenience wrapper to create a service and its matching pods all at once.
func (f *Framework) CreateServiceForSimpleAppWithPods(contPort int, svcPort int, appName string, podSpec func(n v1.Node) v1.PodSpec, count int, block bool) (error, *v1.Service) {
var err error = nil
theService := f.CreateServiceForSimpleApp(contPort, svcPort, appName)
f.CreatePodsPerNodeForSimpleApp(appName, podSpec, count)
if block {
err = testutils.WaitForPodsWithLabelRunning(f.ClientSet, f.Namespace.Name, labels.SelectorFromSet(labels.Set(theService.Spec.Selector)))
}
return err, theService
}
// CreateServiceForSimpleApp returns a service that selects/exposes pods (send -1 ports if no exposure needed) with an app label.
func (f *Framework) CreateServiceForSimpleApp(contPort, svcPort int, appName string) *v1.Service {
if appName == "" {
panic(fmt.Sprintf("no app name provided"))
}
serviceSelector := map[string]string{
"app": appName + "-pod",
}
// For convenience, user sending ports are optional.
portsFunc := func() []v1.ServicePort {
if contPort < 1 || svcPort < 1 {
return nil
} else {
return []v1.ServicePort{{
Protocol: v1.ProtocolTCP,
Port: int32(svcPort),
TargetPort: intstr.FromInt(contPort),
}}
}
}
Logf("Creating a service-for-%v for selecting app=%v-pod", appName, appName)
service, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service-for-" + appName,
Labels: map[string]string{
"app": appName + "-service",
},
},
Spec: v1.ServiceSpec{
Ports: portsFunc(),
Selector: serviceSelector,
},
})
ExpectNoError(err)
return service
}
// CreatePodsPerNodeForSimpleApp Creates pods w/ labels. Useful for tests which make a bunch of pods w/o any networking.
func (f *Framework) CreatePodsPerNodeForSimpleApp(appName string, podSpec func(n v1.Node) v1.PodSpec, maxCount int) map[string]string {
nodes := GetReadySchedulableNodesOrDie(f.ClientSet)
labels := map[string]string{
"app": appName + "-pod",
}
for i, node := range nodes.Items {
// one per node, but no more than maxCount.
if i <= maxCount {
Logf("%v/%v : Creating container with label app=%v-pod", i, maxCount, appName)
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(appName+"-pod-%v", i),
Labels: labels,
},
Spec: podSpec(node),
})
ExpectNoError(err)
}
}
return labels
}
type KubeUser struct {
Name string `yaml:"name"`
User struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
Token string `yaml:"token"`
} `yaml:"user"`
}
type KubeCluster struct {
Name string `yaml:"name"`
Cluster struct {
CertificateAuthorityData string `yaml:"certificate-authority-data"`
Server string `yaml:"server"`
} `yaml:"cluster"`
}
type KubeConfig struct {
Contexts []struct {
Name string `yaml:"name"`
Context struct {
Cluster string `yaml:"cluster"`
User string
} `yaml:"context"`
} `yaml:"contexts"`
Clusters []KubeCluster `yaml:"clusters"`
Users []KubeUser `yaml:"users"`
}
func (kc *KubeConfig) FindUser(name string) *KubeUser {
for _, user := range kc.Users {
if user.Name == name {
return &user
}
}
return nil
}
func (kc *KubeConfig) FindCluster(name string) *KubeCluster {
for _, cluster := range kc.Clusters {
if cluster.Name == name {
return &cluster
}
}
return nil
}
func kubectlExecWithRetry(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) {
for numRetries := 0; numRetries < maxKubectlExecRetries; numRetries++ {
if numRetries > 0 {
Logf("Retrying kubectl exec (retry count=%v/%v)", numRetries+1, maxKubectlExecRetries)
}
stdOutBytes, stdErrBytes, err := kubectlExec(namespace, podName, containerName, args...)
if err != nil {
if strings.Contains(strings.ToLower(string(stdErrBytes)), "i/o timeout") {
// Retry on "i/o timeout" errors
Logf("Warning: kubectl exec encountered i/o timeout.\nerr=%v\nstdout=%v\nstderr=%v)", err, string(stdOutBytes), string(stdErrBytes))
continue
}
if strings.Contains(strings.ToLower(string(stdErrBytes)), "container not found") {
// Retry on "container not found" errors
Logf("Warning: kubectl exec encountered container not found.\nerr=%v\nstdout=%v\nstderr=%v)", err, string(stdOutBytes), string(stdErrBytes))
time.Sleep(2 * time.Second)
continue
}
}
return stdOutBytes, stdErrBytes, err
}
err := fmt.Errorf("Failed: kubectl exec failed %d times with \"i/o timeout\". Giving up.", maxKubectlExecRetries)
return nil, nil, err
}
func kubectlExec(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) {
var stdout, stderr bytes.Buffer
cmdArgs := []string{
"exec",
fmt.Sprintf("--namespace=%v", namespace),
podName,
fmt.Sprintf("-c=%v", containerName),
}
cmdArgs = append(cmdArgs, args...)
cmd := KubectlCmd(cmdArgs...)
cmd.Stdout, cmd.Stderr = &stdout, &stderr
Logf("Running '%s %s'", cmd.Path, strings.Join(cmdArgs, " "))
err := cmd.Run()
return stdout.Bytes(), stderr.Bytes(), err
}
// Wrapper function for ginkgo describe. Adds namespacing.
// TODO: Support type safe tagging as well https://github.com/kubernetes/kubernetes/pull/22401.
func KubeDescribe(text string, body func()) bool {
return Describe("[k8s.io] "+text, body)
}
// Wrapper function for ginkgo It. Adds "[Conformance]" tag and makes static analysis easier.
func ConformanceIt(text string, body interface{}, timeout ...float64) bool {
return It(text+" [Conformance]", body, timeout...)
}
// PodStateVerification represents a verification of pod state.
// Any time you have a set of pods that you want to operate against or query,
// this struct can be used to declaratively identify those pods.
type PodStateVerification struct {
// Optional: only pods that have k=v labels will pass this filter.
Selectors map[string]string
// Required: The phases which are valid for your pod.
ValidPhases []v1.PodPhase
// Optional: only pods passing this function will pass the filter
// Verify a pod.
// As an optimization, in addition to specfying filter (boolean),
// this function allows specifying an error as well.
// The error indicates that the polling of the pod spectrum should stop.
Verify func(v1.Pod) (bool, error)
// Optional: only pods with this name will pass the filter.
PodName string
}
type ClusterVerification struct {
client clientset.Interface
namespace *v1.Namespace // pointer rather than string, since ns isn't created until before each.
podState PodStateVerification
}
func (f *Framework) NewClusterVerification(namespace *v1.Namespace, filter PodStateVerification) *ClusterVerification {
return &ClusterVerification{
f.ClientSet,
namespace,
filter,
}
}
func passesPodNameFilter(pod v1.Pod, name string) bool {
return name == "" || strings.Contains(pod.Name, name)
}
func passesVerifyFilter(pod v1.Pod, verify func(p v1.Pod) (bool, error)) (bool, error) {
if verify == nil {
return true, nil
} else {
verified, err := verify(pod)
// If an error is returned, by definition, pod verification fails
if err != nil {
return false, err
} else {
return verified, nil
}
}
}
func passesPhasesFilter(pod v1.Pod, validPhases []v1.PodPhase) bool {
passesPhaseFilter := false
for _, phase := range validPhases {
if pod.Status.Phase == phase {
passesPhaseFilter = true
}
}
return passesPhaseFilter
}
// filterLabels returns a list of pods which have labels.
func filterLabels(selectors map[string]string, cli clientset.Interface, ns string) (*v1.PodList, error) {
var err error
var selector labels.Selector
var pl *v1.PodList
// List pods based on selectors. This might be a tiny optimization rather then filtering
// everything manually.
if len(selectors) > 0 {
selector = labels.SelectorFromSet(labels.Set(selectors))
options := metav1.ListOptions{LabelSelector: selector.String()}
pl, err = cli.CoreV1().Pods(ns).List(options)
} else {
pl, err = cli.CoreV1().Pods(ns).List(metav1.ListOptions{})
}
return pl, err
}
// filter filters pods which pass a filter. It can be used to compose
// the more useful abstractions like ForEach, WaitFor, and so on, which
// can be used directly by tests.
func (p *PodStateVerification) filter(c clientset.Interface, namespace *v1.Namespace) ([]v1.Pod, error) {
if len(p.ValidPhases) == 0 || namespace == nil {
panic(fmt.Errorf("Need to specify a valid pod phases (%v) and namespace (%v). ", p.ValidPhases, namespace))
}
ns := namespace.Name
pl, err := filterLabels(p.Selectors, c, ns) // Build an v1.PodList to operate against.
Logf("Selector matched %v pods for %v", len(pl.Items), p.Selectors)
if len(pl.Items) == 0 || err != nil {
return pl.Items, err
}
unfilteredPods := pl.Items
filteredPods := []v1.Pod{}
ReturnPodsSoFar:
// Next: Pod must match at least one of the states that the user specified
for _, pod := range unfilteredPods {
if !(passesPhasesFilter(pod, p.ValidPhases) && passesPodNameFilter(pod, p.PodName)) {
continue
}
passesVerify, err := passesVerifyFilter(pod, p.Verify)
if err != nil {
Logf("Error detected on %v : %v !", pod.Name, err)
break ReturnPodsSoFar
}
if passesVerify {
filteredPods = append(filteredPods, pod)
}
}
return filteredPods, err
}
// WaitFor waits for some minimum number of pods to be verified, according to the PodStateVerification
// definition.
func (cl *ClusterVerification) WaitFor(atLeast int, timeout time.Duration) ([]v1.Pod, error) {
pods := []v1.Pod{}
var returnedErr error
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
pods, returnedErr = cl.podState.filter(cl.client, cl.namespace)
// Failure
if returnedErr != nil {
Logf("Cutting polling short: We got an error from the pod filtering layer.")
// stop polling if the pod filtering returns an error. that should never happen.
// it indicates, for example, that the client is broken or something non-pod related.
return false, returnedErr
}
Logf("Found %v / %v", len(pods), atLeast)
// Success
if len(pods) >= atLeast {
return true, nil
}
// Keep trying...
return false, nil
})
Logf("WaitFor completed with timeout %v. Pods found = %v out of %v", timeout, len(pods), atLeast)
return pods, err
}
// WaitForOrFail provides a shorthand WaitFor with failure as an option if anything goes wrong.
func (cl *ClusterVerification) WaitForOrFail(atLeast int, timeout time.Duration) {
pods, err := cl.WaitFor(atLeast, timeout)
if err != nil || len(pods) < atLeast {
Failf("Verified %v of %v pods , error : %v", len(pods), atLeast, err)
}
}
// ForEach runs a function against every verifiable pod. Be warned that this doesn't wait for "n" pods to verifiy,
// so it may return very quickly if you have strict pod state requirements.
//
// For example, if you require at least 5 pods to be running before your test will pass,
// its smart to first call "clusterVerification.WaitFor(5)" before you call clusterVerification.ForEach.
func (cl *ClusterVerification) ForEach(podFunc func(v1.Pod)) error {
pods, err := cl.podState.filter(cl.client, cl.namespace)
if err == nil {
if len(pods) == 0 {
Failf("No pods matched the filter.")
}
Logf("ForEach: Found %v pods from the filter. Now looping through them.", len(pods))
for _, p := range pods {
podFunc(p)
}
} else {
Logf("ForEach: Something went wrong when filtering pods to execute against: %v", err)
}
return err
}
// GetLogToFileFunc is a convenience function that returns a function that have the same interface as
// Logf, but writes to a specified file.
func GetLogToFileFunc(file *os.File) func(format string, args ...interface{}) {
return func(format string, args ...interface{}) {
writer := bufio.NewWriter(file)
if _, err := fmt.Fprintf(writer, format, args...); err != nil {
Logf("Failed to write file %v with test performance data: %v", file.Name(), err)
}
writer.Flush()
}
}