blob: c24bb346cd7e280c9d7e99ffc088a4a978a0f993 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"encoding/json"
"fmt"
"sync"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"github.com/apache/yunikorn-k8shim/pkg/cache/external"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type updateType int
const (
AddOccupiedResource updateType = iota
SubOccupiedResource
)
// scheduler nodes maintain cluster nodes and their status for the scheduler
type schedulerNodes struct {
proxy api.SchedulerAPI
nodesMap map[string]*SchedulerNode
cache *external.SchedulerCache
lock *sync.RWMutex
}
func newSchedulerNodes(schedulerAPI api.SchedulerAPI, cache *external.SchedulerCache) *schedulerNodes {
return &schedulerNodes{
proxy: schedulerAPI,
nodesMap: make(map[string]*SchedulerNode),
cache: cache,
lock: &sync.RWMutex{},
}
}
func (nc *schedulerNodes) getNode(name string) *SchedulerNode {
nc.lock.RLock()
defer nc.lock.RUnlock()
if node, ok := nc.nodesMap[name]; ok {
return node
}
return nil
}
func convertToNode(obj interface{}) (*v1.Node, error) {
if node, ok := obj.(*v1.Node); ok {
return node, nil
}
return nil, fmt.Errorf("cannot convert to *v1.Node: %v", obj)
}
func equals(n1 *v1.Node, n2 *v1.Node) bool {
n1Resource := common.GetNodeResource(&n1.Status)
n2Resource := common.GetNodeResource(&n2.Status)
return common.Equals(n1Resource, n2Resource)
}
func (nc *schedulerNodes) addExistingAllocation(allocation *si.Allocation) error {
nc.lock.Lock()
defer nc.lock.Unlock()
if schedulerNode, ok := nc.nodesMap[allocation.NodeID]; ok {
schedulerNode.addExistingAllocation(allocation)
return nil
}
return fmt.Errorf("orphan allocation %v", allocation)
}
func (nc *schedulerNodes) addNode(node *v1.Node) {
nc.addAndReportNode(node, true)
}
func (nc *schedulerNodes) addAndReportNode(node *v1.Node, reportNode bool) {
nc.lock.Lock()
defer nc.lock.Unlock()
// add node to nodes map
if _, ok := nc.nodesMap[node.Name]; !ok {
var nodeLabels []byte
nodeLabels, err := json.Marshal(node.Labels) // A nil pointer encodes as the "null" JSON value.
if err != nil {
log.Logger().Error("failed to marshall node labels to json", zap.Error(err))
nodeLabels = make([]byte, 0)
}
log.Logger().Info("adding node to context",
zap.String("nodeName", node.Name),
zap.String("nodeLabels", string(nodeLabels)),
zap.Bool("schedulable", !node.Spec.Unschedulable))
ready := hasReadyCondition(node)
newNode := newSchedulerNode(node.Name, string(node.UID), string(nodeLabels),
common.GetNodeResource(&node.Status), nc.proxy, !node.Spec.Unschedulable, ready)
nc.nodesMap[node.Name] = newNode
}
// once node is added to scheduler, first thing is to recover its state
// node might already be in healthy state, previously recovered during recovery process,
// do not trigger recover again in this case.
if reportNode {
if node, ok := nc.nodesMap[node.Name]; ok {
triggerEvent(node, SchedulerNodeStates().New, RecoverNode)
}
}
}
func (nc *schedulerNodes) updateNodeOccupiedResources(name string, resource *si.Resource, opt updateType) {
if common.IsZero(resource) {
return
}
if schedulerNode := nc.getNode(name); schedulerNode != nil {
schedulerNode.updateOccupiedResource(resource, opt)
request := common.CreateUpdateRequestForUpdatedNode(name, schedulerNode.capacity, schedulerNode.occupied, schedulerNode.ready)
log.Logger().Info("report occupied resources updates",
zap.String("node", schedulerNode.name),
zap.Any("request", request))
if err := nc.proxy.UpdateNode(&request); err != nil {
log.Logger().Info("hitting error while handling UpdateNode", zap.Error(err))
}
}
}
func (nc *schedulerNodes) updateNode(oldNode, newNode *v1.Node) {
// before updating a node, check if it exists in the cache or not
// if we receive a update node event but the node doesn't exist,
// we need to add it instead of updating it.
cachedNode := nc.getNode(newNode.Name)
if cachedNode == nil {
nc.addNode(newNode)
return
}
nc.lock.Lock()
defer nc.lock.Unlock()
// cordon or restore node
if (!oldNode.Spec.Unschedulable) && newNode.Spec.Unschedulable {
triggerEvent(cachedNode, SchedulerNodeStates().Healthy, DrainNode)
} else if oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable {
triggerEvent(cachedNode, SchedulerNodeStates().Draining, RestoreNode)
}
ready := hasReadyCondition(newNode)
capacityUpdated := equals(oldNode, newNode)
readyUpdated := cachedNode.ready == ready
if capacityUpdated && readyUpdated {
return
}
// Has node resource updated?
if !capacityUpdated {
cachedNode.setCapacity(common.GetNodeResource(&newNode.Status))
}
// Has node ready status flag updated?
if !readyUpdated {
cachedNode.setReadyStatus(ready)
}
log.Logger().Info("Node's ready status flag", zap.String("Node name", newNode.Name),
zap.Bool("ready", ready))
request := common.CreateUpdateRequestForUpdatedNode(newNode.Name, cachedNode.capacity,
cachedNode.occupied, cachedNode.ready)
log.Logger().Info("report updated nodes to scheduler", zap.Any("request", request))
if err := nc.proxy.UpdateNode(&request); err != nil {
log.Logger().Info("hitting error while handling UpdateNode", zap.Error(err))
}
}
func (nc *schedulerNodes) deleteNode(node *v1.Node) {
nc.lock.Lock()
defer nc.lock.Unlock()
delete(nc.nodesMap, node.Name)
request := common.CreateUpdateRequestForDeleteOrRestoreNode(node.Name, si.NodeInfo_DECOMISSION)
log.Logger().Info("report updated nodes to scheduler", zap.Any("request", request.String()))
if err := nc.proxy.UpdateNode(&request); err != nil {
log.Logger().Error("hitting error while handling UpdateNode", zap.Error(err))
}
}
func (nc *schedulerNodes) schedulerNodeEventHandler() func(obj interface{}) {
return func(obj interface{}) {
if event, ok := obj.(events.SchedulerNodeEvent); ok {
if node := nc.getNode(event.GetNodeID()); node != nil {
if node.canHandle(event) {
if err := node.handle(event); err != nil {
log.Logger().Error("failed to handle scheduler node event",
zap.String("event", event.GetEvent()),
zap.Error(err))
}
}
}
}
}
}
func hasReadyCondition(node *v1.Node) bool {
if node != nil && node.Status.String() != "nil" {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}
}
return false
}
func triggerEvent(node *SchedulerNode, currentState string, eventType SchedulerNodeEventType) {
log.Logger().Info("scheduler node event ", zap.String("name", node.name),
zap.String("current state ", currentState), zap.Stringer("transition to ", eventType))
if node.getNodeState() == currentState {
dispatcher.Dispatch(CachedSchedulerNodeEvent{
NodeID: node.name,
Event: eventType,
})
}
}