blob: caeda4c0ff036ae29a71e7d300275022093125cd [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"encoding/json"
"fmt"
"net"
goruntime "runtime"
"sort"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
taintutil "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
maxImageTagsForTest = 20
)
// generateTestingImageLists generate randomly generated image list and corresponding expectedImageList.
func generateTestingImageLists(count int, maxImages int) ([]kubecontainer.Image, []v1.ContainerImage) {
// imageList is randomly generated image list
var imageList []kubecontainer.Image
for ; count > 0; count-- {
imageItem := kubecontainer.Image{
ID: string(uuid.NewUUID()),
RepoTags: generateImageTags(),
Size: rand.Int63nRange(minImgSize, maxImgSize+1),
}
imageList = append(imageList, imageItem)
}
expectedImageList := makeExpectedImageList(imageList, maxImages)
return imageList, expectedImageList
}
func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1.ContainerImage {
// expectedImageList is generated by imageList according to size and maxImages
// 1. sort the imageList by size
sort.Sort(sliceutils.ByImageSize(imageList))
// 2. convert sorted imageList to v1.ContainerImage list
var expectedImageList []v1.ContainerImage
for _, kubeImage := range imageList {
apiImage := v1.ContainerImage{
Names: kubeImage.RepoTags[0:nodestatus.MaxNamesPerImageInNodeStatus],
SizeBytes: kubeImage.Size,
}
expectedImageList = append(expectedImageList, apiImage)
}
// 3. only returns the top maxImages images in expectedImageList
if maxImages == -1 { // -1 means no limit
return expectedImageList
}
return expectedImageList[0:maxImages]
}
func generateImageTags() []string {
var tagList []string
// Generate > MaxNamesPerImageInNodeStatus tags so that the test can verify
// that kubelet report up to MaxNamesPerImageInNodeStatus tags.
count := rand.IntnRange(nodestatus.MaxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1)
for ; count > 0; count-- {
tagList = append(tagList, "k8s.gcr.io:v"+strconv.Itoa(count))
}
return tagList
}
func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) {
original, err := json.Marshal(originalNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err)
}
updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v",
patch, originalNode, err)
}
updatedNode := &v1.Node{}
if err := json.Unmarshal(updated, updatedNode); err != nil {
return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err)
}
return updatedNode, nil
}
func notImplemented(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
}
func addNotImplatedReaction(kubeClient *fake.Clientset) {
if kubeClient == nil {
return
}
kubeClient.AddReactor("*", "*", notImplemented)
}
type localCM struct {
cm.ContainerManager
allocatableReservation v1.ResourceList
capacity v1.ResourceList
}
func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList {
return lcm.allocatableReservation
}
func (lcm *localCM) GetCapacity() v1.ResourceList {
return lcm.capacity
}
// sortableNodeAddress is a type for sorting []v1.NodeAddress
type sortableNodeAddress []v1.NodeAddress
func (s sortableNodeAddress) Len() int { return len(s) }
func (s sortableNodeAddress) Less(i, j int) bool {
return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address)
}
func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] }
func sortNodeAddresses(addrs sortableNodeAddress) {
sort.Sort(addrs)
}
func TestUpdateNewNodeStatus(t *testing.T) {
cases := []struct {
desc string
nodeStatusMaxImages int32
}{
{
desc: "5 image limit",
nodeStatusMaxImages: 5,
},
{
desc: "no image limit",
nodeStatusMaxImages: -1,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
// generate one more in inputImageList than we configure the Kubelet to report,
// or 5 images if unlimited
numTestImages := int(tc.nodeStatusMaxImages) + 1
if tc.nodeStatusMaxImages == -1 {
numTestImages = 5
}
inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages))
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9, // 10G
}
kubelet.machineInfo = machineInfo
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
Images: expectedImageList,
},
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
assert.NoError(t, err)
for i, cond := range updatedNode.Status.Conditions {
assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type)
assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type)
updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type,
"NotReady should be last")
assert.Len(t, updatedNode.Status.Images, len(expectedImageList))
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
})
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
kubelet.machineInfo = machineInfo
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.Time{}, // placeholder
LastTransitionTime: metav1.Time{}, // placeholder
},
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
// images will be sorted from max to min in node status.
Images: []v1.ContainerImage{
{
Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
SizeBytes: 123,
},
{
Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
SizeBytes: 456,
},
},
},
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
assert.IsType(t, core.PatchActionImpl{}, actions[1])
patchAction := actions[1].(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch())
require.NoError(t, err)
for i, cond := range updatedNode.Status.Conditions {
old := metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time
// Expect LastHearbeat to be updated to Now, while LastTransitionTime to be the same.
assert.NotEqual(t, old, cond.LastHeartbeatTime.Rfc3339Copy().UTC(), "LastHeartbeatTime for condition %v", cond.Type)
assert.EqualValues(t, old, cond.LastTransitionTime.Rfc3339Copy().UTC(), "LastTransitionTime for condition %v", cond.Type)
updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type,
"NodeReady should be the last condition")
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
}
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
attempts := int64(0)
failureCallbacks := int64(0)
// set up a listener that hangs connections
ln, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
defer ln.Close()
go func() {
// accept connections and just let them hang
for {
_, err := ln.Accept()
if err != nil {
t.Log(err)
return
}
t.Log("accepted connection")
atomic.AddInt64(&attempts, 1)
}
}()
config := &rest.Config{
Host: "http://" + ln.Addr().String(),
QPS: -1,
Timeout: time.Second,
}
assert.NoError(t, err)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.heartbeatClient, err = clientset.NewForConfig(config)
kubelet.onRepeatedHeartbeatFailure = func() {
atomic.AddInt64(&failureCallbacks, 1)
}
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
// should return an error, but not hang
assert.Error(t, kubelet.updateNodeStatus())
// should have attempted multiple times
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
}
// should have gotten multiple failure callbacks
if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) {
t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks)
}
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(10E9, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9,
}
kubelet.machineInfo = machineInfo
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
},
{}, //placeholder
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(10E9, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
Images: []v1.ContainerImage{
{
Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
SizeBytes: 123,
},
{
Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
SizeBytes: 456,
},
},
},
}
checkNodeStatus := func(status v1.ConditionStatus, reason string) {
kubeClient.ClearActions()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := kubeClient.CoreV1().Nodes().Get(testKubeletHostname, metav1.GetOptions{})
require.NoError(t, err, "can't apply node status patch")
for i, cond := range updatedNode.Status.Conditions {
assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type)
assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type)
updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
lastIndex := len(updatedNode.Status.Conditions) - 1
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[lastIndex].Type, "NodeReady should be the last condition")
assert.NotEmpty(t, updatedNode.Status.Conditions[lastIndex].Message)
updatedNode.Status.Conditions[lastIndex].Message = ""
expectedNode.Status.Conditions[lastIndex] = v1.NodeCondition{
Type: v1.NodeReady,
Status: status,
Reason: reason,
LastHeartbeatTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
}
// TODO(random-liu): Refactor the unit test to be table driven test.
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
// Should report kubelet ready if the runtime check is updated
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionTrue, "KubeletReady")
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
// Should report kubelet not ready if the runtime check failed
fakeRuntime := testKubelet.fakeRuntime
// Inject error into fake runtime status check, node should be NotReady
fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error")
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
fakeRuntime.StatusErr = nil
// Should report node not ready if runtime status is nil.
fakeRuntime.RuntimeStatus = nil
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
// Should report node not ready if runtime status is empty.
fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{}
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
// Should report node not ready if RuntimeReady is false.
fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{
Conditions: []kubecontainer.RuntimeCondition{
{Type: kubecontainer.RuntimeReady, Status: false},
{Type: kubecontainer.NetworkReady, Status: true},
},
}
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
// Should report node ready if RuntimeReady is true.
fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{
Conditions: []kubecontainer.RuntimeCondition{
{Type: kubecontainer.RuntimeReady, Status: true},
{Type: kubecontainer.NetworkReady, Status: true},
},
}
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionTrue, "KubeletReady")
// Should report node not ready if NetworkReady is false.
fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{
Conditions: []kubecontainer.RuntimeCondition{
{Type: kubecontainer.RuntimeReady, Status: true},
{Type: kubecontainer.NetworkReady, Status: false},
},
}
kubelet.updateRuntimeUp()
checkNodeStatus(v1.ConditionFalse, "KubeletNotReady")
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain
assert.Error(t, kubelet.updateNodeStatus())
assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry)
}
func TestUpdateNodeStatusWithLease(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
clock := testKubelet.fakeClock
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubelet.nodeStatusReportFrequency = time.Minute
kubeClient := testKubelet.fakeKubeClient
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
kubelet.machineInfo = machineInfo
now := metav1.NewTime(clock.Now()).Rfc3339Copy()
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
// images will be sorted from max to min in node status.
Images: []v1.ContainerImage{
{
Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
SizeBytes: 123,
},
{
Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
SizeBytes: 456,
},
},
},
}
// Update node status when node status is created.
// Report node status.
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
assert.IsType(t, core.GetActionImpl{}, actions[0])
assert.IsType(t, core.PatchActionImpl{}, actions[1])
patchAction := actions[1].(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(existingNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type,
"NodeReady should be the last condition")
// Update node status again when nothing is changed (except heatbeat time).
// Report node status if it has exceeded the duration of nodeStatusReportFrequency.
clock.Step(time.Minute)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 2 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 4)
assert.IsType(t, core.GetActionImpl{}, actions[2])
assert.IsType(t, core.PatchActionImpl{}, actions[3])
patchAction = actions[3].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
// Expect LastHearbeat updated, other things unchanged.
for i, cond := range expectedNode.Status.Conditions {
expectedNode.Status.Conditions[i].LastHeartbeatTime = metav1.NewTime(cond.LastHeartbeatTime.Time.Add(time.Minute)).Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Update node status again when nothing is changed (except heatbeat time).
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 4 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 5)
assert.IsType(t, core.GetActionImpl{}, actions[4])
// Update node status again when something is changed.
// Report node status even if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
var newMemoryCapacity int64 = 40E9
kubelet.machineInfo.MemoryCapacity = uint64(newMemoryCapacity)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 5 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 7)
assert.IsType(t, core.GetActionImpl{}, actions[5])
assert.IsType(t, core.PatchActionImpl{}, actions[6])
patchAction = actions[6].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
memCapacity, _ := updatedNode.Status.Capacity[v1.ResourceMemory]
updatedMemoryCapacity, _ := (&memCapacity).AsInt64()
assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity")
now = metav1.NewTime(clock.Now()).Rfc3339Copy()
for _, cond := range updatedNode.Status.Conditions {
// Expect LastHearbeat updated, while LastTransitionTime unchanged.
assert.Equal(t, now, cond.LastHeartbeatTime.Rfc3339Copy(),
"LastHeartbeatTime for condition %v", cond.Type)
assert.Equal(t, now, metav1.NewTime(cond.LastTransitionTime.Time.Add(time.Minute+20*time.Second)).Rfc3339Copy(),
"LastTransitionTime for condition %v", cond.Type)
}
// Update node status when changing pod CIDR.
// Report node status if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty")
podCIDR := "10.0.0.0/24"
updatedNode.Spec.PodCIDR = podCIDR
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
assert.NoError(t, kubelet.updateNodeStatus())
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
// 2 more action (There were 7 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 9)
assert.IsType(t, core.GetActionImpl{}, actions[7])
assert.IsType(t, core.PatchActionImpl{}, actions[8])
patchAction = actions[8].(core.PatchActionImpl)
// Update node status when keeping the pod CIDR.
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 9 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 10)
assert.IsType(t, core.GetActionImpl{}, actions[9])
}
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &v1.Node{}, &apierrors.StatusError{
ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists},
}
})
kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testKubeletHostname,
Labels: map[string]string{
kubeletapis.LabelHostname: testKubeletHostname,
kubeletapis.LabelOS: goruntime.GOOS,
kubeletapis.LabelArch: goruntime.GOARCH,
},
},
}, nil
})
addNotImplatedReaction(kubeClient)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
kubelet.machineInfo = machineInfo
done := make(chan struct{})
go func() {
kubelet.registerWithAPIServer()
done <- struct{}{}
}()
select {
case <-time.After(wait.ForeverTestTimeout):
assert.Fail(t, "timed out waiting for registration")
case <-done:
return
}
}
func TestTryRegisterWithApiServer(t *testing.T) {
alreadyExists := &apierrors.StatusError{
ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists},
}
conflict := &apierrors.StatusError{
ErrStatus: metav1.Status{Reason: metav1.StatusReasonConflict},
}
newNode := func(cmad bool) *v1.Node {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: testKubeletHostname,
kubeletapis.LabelOS: goruntime.GOOS,
kubeletapis.LabelArch: goruntime.GOARCH,
},
},
}
if cmad {
node.Annotations = make(map[string]string)
node.Annotations[util.ControllerManagedAttachAnnotation] = "true"
}
return node
}
cases := []struct {
name string
newNode *v1.Node
existingNode *v1.Node
createError error
getError error
patchError error
deleteError error
expectedResult bool
expectedActions int
testSavedNode bool
savedNodeIndex int
savedNodeCMAD bool
}{
{
name: "success case - new node",
newNode: &v1.Node{},
expectedResult: true,
expectedActions: 1,
},
{
name: "success case - existing node - no change in CMAD",
newNode: newNode(true),
createError: alreadyExists,
existingNode: newNode(true),
expectedResult: true,
expectedActions: 2,
},
{
name: "success case - existing node - CMAD disabled",
newNode: newNode(false),
createError: alreadyExists,
existingNode: newNode(true),
expectedResult: true,
expectedActions: 3,
testSavedNode: true,
savedNodeIndex: 2,
savedNodeCMAD: false,
},
{
name: "success case - existing node - CMAD enabled",
newNode: newNode(true),
createError: alreadyExists,
existingNode: newNode(false),
expectedResult: true,
expectedActions: 3,
testSavedNode: true,
savedNodeIndex: 2,
savedNodeCMAD: true,
},
{
name: "create failed",
newNode: newNode(false),
createError: conflict,
expectedResult: false,
expectedActions: 1,
},
{
name: "get existing node failed",
newNode: newNode(false),
createError: alreadyExists,
getError: conflict,
expectedResult: false,
expectedActions: 2,
},
{
name: "update existing node failed",
newNode: newNode(false),
createError: alreadyExists,
existingNode: newNode(true),
patchError: conflict,
expectedResult: false,
expectedActions: 3,
},
}
for _, tc := range cases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.createError
})
kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, tc.existingNode, tc.getError
})
kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
return true, nil, tc.patchError
}
return notImplemented(action)
})
kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.deleteError
})
addNotImplatedReaction(kubeClient)
result := kubelet.tryRegisterWithAPIServer(tc.newNode)
require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name)
actions := kubeClient.Actions()
assert.Len(t, actions, tc.expectedActions, "test [%s]", tc.name)
if tc.testSavedNode {
var savedNode *v1.Node
t.Logf("actions: %v: %+v", len(actions), actions)
action := actions[tc.savedNodeIndex]
if action.GetVerb() == "create" {
createAction := action.(core.CreateAction)
obj := createAction.GetObject()
require.IsType(t, &v1.Node{}, obj)
savedNode = obj.(*v1.Node)
} else if action.GetVerb() == "patch" {
patchAction := action.(core.PatchActionImpl)
var err error
savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch())
require.NoError(t, err)
}
actualCMAD, _ := strconv.ParseBool(savedNode.Annotations[util.ControllerManagedAttachAnnotation])
assert.Equal(t, tc.savedNodeCMAD, actualCMAD, "test [%s]", tc.name)
}
}
}
func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
const nodeStatusMaxImages = 5
// generate one more in inputImageList than we configure the Kubelet to report
inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages)
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = nodeStatusMaxImages
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(1000, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9, // 10G
}
kubelet.machineInfo = machineInfo
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI),
},
},
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
assert.NoError(t, err)
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", diff.ObjectDiff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable))
}
func TestUpdateDefaultLabels(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
cases := []struct {
name string
initialNode *v1.Node
existingNode *v1.Node
needsUpdate bool
finalLabels map[string]string
}{
{
name: "make sure default labels exist",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
},
needsUpdate: true,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
{
name: "make sure default labels are up to date",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "old-hostname",
kubeletapis.LabelZoneFailureDomain: "old-zone-failure-domain",
kubeletapis.LabelZoneRegion: "old-zone-region",
kubeletapis.LabelInstanceType: "old-instance-type",
kubeletapis.LabelOS: "old-os",
kubeletapis.LabelArch: "old-arch",
},
},
},
needsUpdate: true,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
{
name: "make sure existing labels do not get deleted",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
"please-persist": "foo",
},
},
},
needsUpdate: false,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
"please-persist": "foo",
},
},
{
name: "make sure existing labels do not get deleted when initial node has no opinion",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
"please-persist": "foo",
},
},
},
needsUpdate: false,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
"please-persist": "foo",
},
},
{
name: "no update needed",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
needsUpdate: false,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
{
name: "not panic when existing node has nil labels",
initialNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{},
},
needsUpdate: true,
finalLabels: map[string]string{
kubeletapis.LabelHostname: "new-hostname",
kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain",
kubeletapis.LabelZoneRegion: "new-zone-region",
kubeletapis.LabelInstanceType: "new-instance-type",
kubeletapis.LabelOS: "new-os",
kubeletapis.LabelArch: "new-arch",
},
},
}
for _, tc := range cases {
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
needsUpdate := kubelet.updateDefaultLabels(tc.initialNode, tc.existingNode)
assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name)
assert.Equal(t, tc.finalLabels, tc.existingNode.Labels, tc.name)
}
}
func TestReconcileExtendedResource(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
extendedResourceName1 := v1.ResourceName("test.com/resource1")
extendedResourceName2 := v1.ResourceName("test.com/resource2")
cases := []struct {
name string
existingNode *v1.Node
expectedNode *v1.Node
needsUpdate bool
}{
{
name: "no update needed without extended resource",
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI),
},
},
},
needsUpdate: true,
},
}
for _, tc := range cases {
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
initialNode := &v1.Node{}
needsUpdate := kubelet.reconcileExtendedResource(initialNode, tc.existingNode)
assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name)
assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name)
}
}
func TestValidateNodeIPParam(t *testing.T) {
type test struct {
nodeIP string
success bool
testName string
}
tests := []test{
{
nodeIP: "",
success: false,
testName: "IP not set",
},
{
nodeIP: "127.0.0.1",
success: false,
testName: "IPv4 loopback address",
},
{
nodeIP: "::1",
success: false,
testName: "IPv6 loopback address",
},
{
nodeIP: "224.0.0.1",
success: false,
testName: "multicast IPv4 address",
},
{
nodeIP: "ff00::1",
success: false,
testName: "multicast IPv6 address",
},
{
nodeIP: "169.254.0.1",
success: false,
testName: "IPv4 link-local unicast address",
},
{
nodeIP: "fe80::0202:b3ff:fe1e:8329",
success: false,
testName: "IPv6 link-local unicast address",
},
{
nodeIP: "0.0.0.0",
success: false,
testName: "Unspecified IPv4 address",
},
{
nodeIP: "::",
success: false,
testName: "Unspecified IPv6 address",
},
{
nodeIP: "1.2.3.4",
success: false,
testName: "IPv4 address that doesn't belong to host",
},
}
addrs, err := net.InterfaceAddrs()
if err != nil {
assert.Error(t, err, fmt.Sprintf(
"Unable to obtain a list of the node's unicast interface addresses."))
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() || ip.IsLinkLocalUnicast() {
break
}
successTest := test{
nodeIP: ip.String(),
success: true,
testName: fmt.Sprintf("Success test case for address %s", ip.String()),
}
tests = append(tests, successTest)
}
for _, test := range tests {
err := validateNodeIP(net.ParseIP(test.nodeIP))
if test.success {
assert.NoError(t, err, "test %s", test.testName)
} else {
assert.Error(t, err, fmt.Sprintf("test %s", test.testName))
}
}
}
func TestRegisterWithApiServerWithTaint(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
kubelet.machineInfo = machineInfo
var gotNode runtime.Object
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
gotNode = createAction.GetObject()
return true, gotNode, nil
})
addNotImplatedReaction(kubeClient)
// Make node to be unschedulable.
kubelet.registerSchedulable = false
forEachFeatureGate(t, []utilfeature.Feature{features.TaintNodesByCondition}, func(t *testing.T) {
// Reset kubelet status for each test.
kubelet.registrationCompleted = false
// Register node to apiserver.
kubelet.registerWithAPIServer()
// Check the unschedulable taint.
got := gotNode.(*v1.Node)
unschedulableTaint := &v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
}
require.Equal(t,
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
taintutil.TaintExists(got.Spec.Taints, unschedulableTaint),
"test unschedulable taint for TaintNodesByCondition")
return
})
}
func TestNodeStatusHasChanged(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeFuture := metav1.Time{Time: fakeNow.Time.Add(time.Minute)}
readyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffHearbeatTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffTransitionTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeFuture,
}
notReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
memoryPressureCondition := v1.NodeCondition{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
testcases := []struct {
name string
originalStatus *v1.NodeStatus
status *v1.NodeStatus
expectChange bool
}{
{
name: "Node status does not change with nil status.",
originalStatus: nil,
status: nil,
expectChange: false,
},
{
name: "Node status does not change with default status.",
originalStatus: &v1.NodeStatus{},
status: &v1.NodeStatus{},
expectChange: false,
},
{
name: "Node status changes with nil and default status.",
originalStatus: nil,
status: &v1.NodeStatus{},
expectChange: true,
},
{
name: "Node status changes with nil and status.",
originalStatus: nil,
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status does not change with empty conditions.",
originalStatus: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
status: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
expectChange: false,
},
{
name: "Node status does not change",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if heartbeat time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffHearbeatTime, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if the orders of conditions are different.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{memoryPressureCondition, readyConditionAtDiffHearbeatTime},
},
expectChange: false,
},
{
name: "Node status changes if condition status differs.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{notReadyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes if transition time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffTransitionTime, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different number of conditions.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different phase.",
originalStatus: &v1.NodeStatus{
Phase: v1.NodePending,
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Phase: v1.NodeRunning,
Conditions: []v1.NodeCondition{readyCondition},
},
expectChange: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
originalStatusCopy := tc.originalStatus.DeepCopy()
statusCopy := tc.status.DeepCopy()
changed := nodeStatusHasChanged(tc.originalStatus, tc.status)
assert.Equal(t, tc.expectChange, changed, "Expect node status change to be %t, but got %t.", tc.expectChange, changed)
assert.True(t, apiequality.Semantic.DeepEqual(originalStatusCopy, tc.originalStatus), "%s", diff.ObjectDiff(originalStatusCopy, tc.originalStatus))
assert.True(t, apiequality.Semantic.DeepEqual(statusCopy, tc.status), "%s", diff.ObjectDiff(statusCopy, tc.status))
})
}
}