| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package kubemark |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "sync" |
| "time" |
| |
| apiv1 "k8s.io/api/core/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/fields" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/client-go/informers" |
| informersv1 "k8s.io/client-go/informers/core/v1" |
| kubeclient "k8s.io/client-go/kubernetes" |
| listersv1 "k8s.io/client-go/listers/core/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/kubernetes/pkg/controller" |
| |
| "k8s.io/klog" |
| ) |
| |
| const ( |
| namespaceKubemark = "kubemark" |
| nodeGroupLabel = "autoscaling.k8s.io/nodegroup" |
| numRetries = 3 |
| ) |
| |
| // KubemarkController is a simplified version of cloud provider for kubemark. It allows |
| // to add and delete nodes from a kubemark cluster and introduces nodegroups |
| // by applying labels to the kubemark's hollow-nodes. |
| type KubemarkController struct { |
| nodeTemplate *apiv1.ReplicationController |
| externalCluster externalCluster |
| kubemarkCluster kubemarkCluster |
| rand *rand.Rand |
| createNodeQueue chan string |
| nodeGroupQueueSize map[string]int |
| nodeGroupQueueSizeLock sync.Mutex |
| } |
| |
| // externalCluster is used to communicate with the external cluster that hosts |
| // kubemark, in order to be able to list, create and delete hollow nodes |
| // by manipulating the replication controllers. |
| type externalCluster struct { |
| rcLister listersv1.ReplicationControllerLister |
| rcSynced cache.InformerSynced |
| podLister listersv1.PodLister |
| podSynced cache.InformerSynced |
| client kubeclient.Interface |
| } |
| |
| // kubemarkCluster is used to delete nodes from kubemark cluster once their |
| // respective replication controllers have been deleted and the nodes have |
| // become unready. This is to cover for the fact that there is no proper cloud |
| // provider for kubemark that would care for deleting the nodes. |
| type kubemarkCluster struct { |
| client kubeclient.Interface |
| nodeLister listersv1.NodeLister |
| nodeSynced cache.InformerSynced |
| nodesToDelete map[string]bool |
| nodesToDeleteLock sync.Mutex |
| } |
| |
| // NewKubemarkController creates KubemarkController using the provided clients to talk to external |
| // and kubemark clusters. |
| func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory, |
| kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) { |
| rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer) |
| podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer) |
| controller := &KubemarkController{ |
| externalCluster: externalCluster{ |
| rcLister: listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()), |
| rcSynced: rcInformer.HasSynced, |
| podLister: listersv1.NewPodLister(podInformer.GetIndexer()), |
| podSynced: podInformer.HasSynced, |
| client: externalClient, |
| }, |
| kubemarkCluster: kubemarkCluster{ |
| nodeLister: kubemarkNodeInformer.Lister(), |
| nodeSynced: kubemarkNodeInformer.Informer().HasSynced, |
| client: kubemarkClient, |
| nodesToDelete: make(map[string]bool), |
| nodesToDeleteLock: sync.Mutex{}, |
| }, |
| rand: rand.New(rand.NewSource(time.Now().UnixNano())), |
| createNodeQueue: make(chan string, 1000), |
| nodeGroupQueueSize: make(map[string]int), |
| nodeGroupQueueSizeLock: sync.Mutex{}, |
| } |
| |
| kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| UpdateFunc: controller.kubemarkCluster.removeUnneededNodes, |
| }) |
| |
| return controller, nil |
| } |
| |
| // WaitForCacheSync waits until all caches in the controller are populated. |
| func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool { |
| return controller.WaitForCacheSync("kubemark", stopCh, |
| kubemarkController.externalCluster.rcSynced, |
| kubemarkController.externalCluster.podSynced, |
| kubemarkController.kubemarkCluster.nodeSynced) |
| } |
| |
| // Run populates the node template needed for creation of kubemark nodes and |
| // starts the worker routine for creating new nodes. |
| func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) { |
| nodeTemplate, err := kubemarkController.getNodeTemplate() |
| if err != nil { |
| klog.Fatalf("failed to get node template: %s", err) |
| } |
| kubemarkController.nodeTemplate = nodeTemplate |
| |
| go kubemarkController.runNodeCreation(stopCh) |
| <-stopCh |
| } |
| |
| // GetNodeNamesForNodeGroup returns list of the nodes in the node group. |
| func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) { |
| selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup}) |
| pods, err := kubemarkController.externalCluster.podLister.List(selector) |
| if err != nil { |
| return nil, err |
| } |
| result := make([]string, 0, len(pods)) |
| for _, pod := range pods { |
| result = append(result, pod.ObjectMeta.Name) |
| } |
| return result, nil |
| } |
| |
| // GetNodeGroupSize returns the current size for the node group as observed. |
| func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) { |
| selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup})) |
| nodes, err := kubemarkController.externalCluster.rcLister.List(selector) |
| if err != nil { |
| return 0, err |
| } |
| return len(nodes), nil |
| } |
| |
| // GetNodeGroupTargetSize returns the size of the node group as a sum of current |
| // observed size and number of upcoming nodes. |
| func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) { |
| kubemarkController.nodeGroupQueueSizeLock.Lock() |
| defer kubemarkController.nodeGroupQueueSizeLock.Unlock() |
| realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) |
| if err != nil { |
| return realSize, err |
| } |
| return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil |
| } |
| |
| // SetNodeGroupSize changes the size of node group by adding or removing nodes. |
| func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error { |
| currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup) |
| if err != nil { |
| return err |
| } |
| switch delta := size - currSize; { |
| case delta < 0: |
| absDelta := -delta |
| nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup) |
| if err != nil { |
| return err |
| } |
| if len(nodes) < absDelta { |
| return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes)) |
| } |
| for i, node := range nodes { |
| if i == absDelta { |
| return nil |
| } |
| if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil { |
| return err |
| } |
| } |
| case delta > 0: |
| kubemarkController.nodeGroupQueueSizeLock.Lock() |
| for i := 0; i < delta; i++ { |
| kubemarkController.nodeGroupQueueSize[nodeGroup]++ |
| kubemarkController.createNodeQueue <- nodeGroup |
| } |
| kubemarkController.nodeGroupQueueSizeLock.Unlock() |
| } |
| |
| return nil |
| } |
| |
| // GetNodeGroupForNode returns the name of the node group to which the node |
| // belongs. |
| func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) { |
| pod := kubemarkController.getPodByName(node) |
| if pod == nil { |
| return "", fmt.Errorf("node %s does not exist", node) |
| } |
| nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel] |
| if ok { |
| return nodeGroup, nil |
| } |
| return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel) |
| } |
| |
| func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error { |
| node := kubemarkController.nodeTemplate.DeepCopy() |
| node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63()) |
| node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name} |
| node.Spec.Template.Labels = node.Labels |
| |
| var err error |
| for i := 0; i < numRetries; i++ { |
| _, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(node) |
| if err == nil { |
| return nil |
| } |
| } |
| |
| return err |
| } |
| |
| func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error { |
| pod := kubemarkController.getPodByName(node) |
| if pod == nil { |
| klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup) |
| return nil |
| } |
| if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { |
| return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) |
| } |
| policy := metav1.DeletePropagationForeground |
| var err error |
| for i := 0; i < numRetries; i++ { |
| err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( |
| pod.ObjectMeta.Labels["name"], |
| &metav1.DeleteOptions{PropagationPolicy: &policy}) |
| if err == nil { |
| klog.Infof("marking node %s for deletion", node) |
| // Mark node for deletion from kubemark cluster. |
| // Once it becomes unready after replication controller |
| // deletion has been noticed, we will delete it explicitly. |
| // This is to cover for the fact that kubemark does not |
| // take care of this itself. |
| kubemarkController.kubemarkCluster.markNodeForDeletion(node) |
| return nil |
| } |
| } |
| return fmt.Errorf("Failed to delete node %s: %v", node, err) |
| } |
| |
| func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController { |
| rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything()) |
| if err != nil { |
| return nil |
| } |
| for _, rc := range rcs { |
| if rc.ObjectMeta.Name == name { |
| return rc |
| } |
| } |
| return nil |
| } |
| |
| func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod { |
| pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) |
| if err != nil { |
| return nil |
| } |
| for _, pod := range pods { |
| if pod.ObjectMeta.Name == name { |
| return pod |
| } |
| } |
| return nil |
| } |
| |
| func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) { |
| pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) |
| if err != nil { |
| return "", err |
| } |
| for _, pod := range pods { |
| if pod.ObjectMeta.Name == podName { |
| return pod.Labels["name"], nil |
| } |
| } |
| return "", fmt.Errorf("pod %s not found", podName) |
| } |
| |
| // getNodeTemplate returns the template for hollow node replication controllers |
| // by looking for an existing hollow node specification. This requires at least |
| // one kubemark node to be present on startup. |
| func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) { |
| podName, err := kubemarkController.kubemarkCluster.getHollowNodeName() |
| if err != nil { |
| return nil, err |
| } |
| hollowNodeName, err := kubemarkController.getNodeNameForPod(podName) |
| if err != nil { |
| return nil, err |
| } |
| if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil { |
| nodeTemplate := &apiv1.ReplicationController{ |
| Spec: apiv1.ReplicationControllerSpec{ |
| Template: hollowNode.Spec.Template, |
| }, |
| } |
| |
| nodeTemplate.Spec.Selector = nil |
| nodeTemplate.Namespace = namespaceKubemark |
| one := int32(1) |
| nodeTemplate.Spec.Replicas = &one |
| |
| return nodeTemplate, nil |
| } |
| return nil, fmt.Errorf("can't get hollow node template") |
| } |
| |
| func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) { |
| for { |
| select { |
| case nodeGroup := <-kubemarkController.createNodeQueue: |
| kubemarkController.nodeGroupQueueSizeLock.Lock() |
| err := kubemarkController.addNodeToNodeGroup(nodeGroup) |
| if err != nil { |
| klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err) |
| } else { |
| kubemarkController.nodeGroupQueueSize[nodeGroup]-- |
| } |
| kubemarkController.nodeGroupQueueSizeLock.Unlock() |
| case <-stop: |
| return |
| } |
| } |
| } |
| |
| func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) { |
| nodes, err := kubemarkCluster.nodeLister.List(labels.Everything()) |
| if err != nil { |
| return "", err |
| } |
| for _, node := range nodes { |
| return node.Name, nil |
| } |
| return "", fmt.Errorf("did not find any hollow nodes in the cluster") |
| } |
| |
| func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) { |
| node, ok := newObj.(*apiv1.Node) |
| if !ok { |
| return |
| } |
| for _, condition := range node.Status.Conditions { |
| // Delete node if it is in unready state, and it has been |
| // explicitly marked for deletion. |
| if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue { |
| kubemarkCluster.nodesToDeleteLock.Lock() |
| defer kubemarkCluster.nodesToDeleteLock.Unlock() |
| if kubemarkCluster.nodesToDelete[node.Name] { |
| kubemarkCluster.nodesToDelete[node.Name] = false |
| if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil { |
| klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err) |
| } |
| } |
| return |
| } |
| } |
| } |
| |
| func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) { |
| kubemarkCluster.nodesToDeleteLock.Lock() |
| defer kubemarkCluster.nodesToDeleteLock.Unlock() |
| kubemarkCluster.nodesToDelete[name] = true |
| } |
| |
| func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { |
| rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything()) |
| return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil) |
| } |
| |
| func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { |
| podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything()) |
| return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil) |
| } |