blob: e857128eb0ef495e7d0e60db8353863741e6d3ee [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
schedulercache "github.com/apache/incubator-yunikorn-k8shim/pkg/cache/external"
"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
"github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
"github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates"
"github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/support"
"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
)
// context maintains scheduling state, like apps and apps' tasks.
type Context struct {
applications map[string]*Application // apps
nodes *schedulerNodes // nodes
schedulerCache *schedulercache.SchedulerCache // external cache
apiProvider client.APIProvider // apis to interact with api-server, scheduler-core, etc
predManager predicates.PredicateManager // K8s predicates
lock *sync.RWMutex // lock
}
// Create a new context for the scheduler.
func NewContext(apis client.APIProvider) *Context {
// create the context note that order is important:
// volumebinder needs the informers
// the cache needs informers and volumebinder
// nodecontroller needs the cache
// predictor need the cache, volumebinder and informers
ctx := &Context{
applications: make(map[string]*Application),
apiProvider: apis,
lock: &sync.RWMutex{},
}
// create the cache
ctx.schedulerCache = schedulercache.NewSchedulerCache(apis.GetAPIs())
// init the controllers and plugins (need the cache)
ctx.nodes = newSchedulerNodes(apis.GetAPIs().SchedulerAPI, ctx.schedulerCache)
// create the predicate manager
if !apis.IsTestingMode() {
sharedLister := support.NewSharedLister(ctx.schedulerCache)
clientSet := apis.GetAPIs().KubeClient.GetClientSet()
informerFactory := apis.GetAPIs().InformerFactory
ctx.predManager = predicates.NewPredicateManager(support.NewFrameworkHandle(sharedLister, informerFactory, clientSet))
}
return ctx
}
func (ctx *Context) AddSchedulingEventHandlers() {
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.NodeInformerHandlers,
AddFn: ctx.addNode,
UpdateFn: ctx.updateNode,
DeleteFn: ctx.deleteNode,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: ctx.filterPods,
AddFn: ctx.addPodToCache,
UpdateFn: ctx.updatePodInCache,
DeleteFn: ctx.removePodFromCache,
})
nodeCoordinator := newNodeResourceCoordinator(ctx.nodes)
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: nodeCoordinator.filterPods,
UpdateFn: nodeCoordinator.updatePod,
DeleteFn: nodeCoordinator.deletePod,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.ConfigMapInformerHandlers,
FilterFn: ctx.filterConfigMaps,
AddFn: ctx.addConfigMaps,
UpdateFn: ctx.updateConfigMaps,
DeleteFn: ctx.deleteConfigMaps,
})
}
func (ctx *Context) addNode(obj interface{}) {
node, err := convertToNode(obj)
if err != nil {
log.Logger().Error("node conversion failed", zap.Error(err))
return
}
// add node to secondary scheduler cache
log.Logger().Debug("adding node to cache", zap.String("NodeName", node.Name))
ctx.schedulerCache.AddNode(node)
// add node to internal cache
ctx.nodes.addNode(node)
// post the event
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "NodeAccepted",
fmt.Sprintf("node %s is accepted by the scheduler", node.Name))
}
func (ctx *Context) updateNode(oldObj, newObj interface{}) {
// we only trigger update when resource changes
oldNode, err := convertToNode(oldObj)
if err != nil {
log.Logger().Error("old node conversion failed",
zap.Error(err))
return
}
newNode, err := convertToNode(newObj)
if err != nil {
log.Logger().Error("new node conversion failed",
zap.Error(err))
return
}
// update secondary cache
if err := ctx.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
log.Logger().Error("unable to update node in scheduler cache",
zap.Error(err))
return
}
// update primary cache
ctx.nodes.updateNode(oldNode, newNode)
}
func (ctx *Context) deleteNode(obj interface{}) {
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
node = t
case cache.DeletedFinalStateUnknown:
var ok bool
node, ok = t.Obj.(*v1.Node)
if !ok {
log.Logger().Error("cannot convert to *v1.Node", zap.Any("object", t.Obj))
return
}
default:
log.Logger().Error("cannot convert to *v1.Node", zap.Any("object", t))
return
}
// delete node from secondary cache
log.Logger().Debug("delete node from cache", zap.String("nodeName", node.Name))
if err := ctx.schedulerCache.RemoveNode(node); err != nil {
log.Logger().Error("unable to delete node from scheduler cache",
zap.Error(err))
return
}
// delete node from primary cache
ctx.nodes.deleteNode(node)
// post the event
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "NodeDeleted",
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
}
func (ctx *Context) addPodToCache(obj interface{}) {
pod, err := utils.Convert2Pod(obj)
if err != nil {
log.Logger().Error("failed to add pod to cache", zap.Error(err))
return
}
log.Logger().Debug("adding pod to cache", zap.String("podName", pod.Name))
if err := ctx.schedulerCache.AddPod(pod); err != nil {
log.Logger().Error("add pod to scheduler cache failed",
zap.String("podName", pod.Name),
zap.Error(err))
}
}
func (ctx *Context) removePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
log.Logger().Error("Cannot convert to *v1.Pod", zap.Any("pod", obj))
return
}
default:
log.Logger().Error("Cannot convert to *v1.Pod", zap.Any("pod", obj))
return
}
log.Logger().Debug("removing pod from cache", zap.String("podName", pod.Name))
if err := ctx.schedulerCache.RemovePod(pod); err != nil {
log.Logger().Debug("failed to remove pod from scheduler cache",
zap.String("podName", pod.Name),
zap.Error(err))
}
}
func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
oldPod, err := utils.Convert2Pod(oldObj)
if err != nil {
log.Logger().Error("failed to update pod in cache", zap.Error(err))
return
}
newPod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Logger().Error("failed to update pod in cache", zap.Error(err))
return
}
if err := ctx.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
log.Logger().Debug("failed to update pod in cache",
zap.String("podName", oldPod.Name),
zap.Error(err))
}
}
// filter pods by scheduler name and state
func (ctx *Context) filterPods(obj interface{}) bool {
switch obj := obj.(type) {
case *v1.Pod:
// if a terminated pod is added to cache, it will
// add requested resource to the cached node, causing
// the node uses more resources that it actually is,
// this can only be fixed after the pod is removed.
// (trigger the delete pod)
return utils.GeneralPodFilter(obj) &&
!utils.IsPodTerminated(obj)
default:
return false
}
}
// filter configMap for the scheduler
func (ctx *Context) filterConfigMaps(obj interface{}) bool {
switch obj := obj.(type) {
case *v1.ConfigMap:
return obj.Name == constants.DefaultConfigMapName
default:
return false
}
}
// when detects the configMap for the scheduler is added, trigger hot-refresh
func (ctx *Context) addConfigMaps(obj interface{}) {
log.Logger().Debug("configMap added")
ctx.triggerReloadConfig()
}
// when detects the configMap for the scheduler is updated, trigger hot-refresh
func (ctx *Context) updateConfigMaps(obj, newObj interface{}) {
if ctx.apiProvider.GetAPIs().Conf.EnableConfigHotRefresh {
log.Logger().Debug("trigger scheduler to reload configuration")
// When update event is received, it is not guaranteed the data mounted to the pod
// is also updated. This is because the actual update in pod's volume is ensured
// by kubelet, kubelet is checking whether the mounted ConfigMap is fresh on every
// periodic sync. As a result, the total delay from the moment when the ConfigMap
// is updated to the moment when new keys are projected to the pod can be as long
// as kubelet sync period + ttl of ConfigMaps cache in kubelet.
// We trigger configuration reload, on yunikorn-core side, it keeps checking config
// file state once this is called. And the actual reload happens when it detects
// actual changes on the content.
ctx.triggerReloadConfig()
} else {
log.Logger().Warn("Skip to reload scheduler configuration")
}
}
// when detects the configMap for the scheduler is deleted, no operation needed here
// we assume there will be a consequent add operation after delete, so we treat it like a update.
func (ctx *Context) deleteConfigMaps(obj interface{}) {
log.Logger().Debug("configMap deleted")
}
func (ctx *Context) triggerReloadConfig() {
log.Logger().Info("trigger scheduler configuration reloading")
clusterId := ctx.apiProvider.GetAPIs().Conf.ClusterID
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateConfiguration(clusterId); err != nil {
log.Logger().Error("reload configuration failed", zap.Error(err))
}
}
// evaluate given predicates based on current context
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
// simply skip if predicates are not enabled
if ctx.apiProvider.IsTestingMode() {
return nil
}
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if pod, ok := ctx.schedulerCache.GetPod(name); ok {
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
_, err := ctx.predManager.Predicates(pod, targetNode, allocate)
return err
}
}
return fmt.Errorf("predicates were not running because pod or node was not found in cache")
}
// call volume binder to bind pod volumes if necessary,
// internally, volume binder maintains a cache (podBindingCache) for pod volumes,
// and before calling this, they should have been updated by FindPodVolumes and AssumePodVolumes.
func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
podKey := string(pod.UID)
// the assumePodVolumes was done in scheduler-core, because these assumed pods are cached
// during scheduling process as they have directly impact to other scheduling processes.
// when assumePodVolumes was called, we caches the value if all pod volumes are bound in schedulerCache,
// then here we just need to retrieve that value from cache, to skip bindings if volumes are already bound.
if assumedPod, exist := ctx.schedulerCache.GetPod(podKey); exist {
if ctx.schedulerCache.ArePodVolumesAllBound(podKey) {
log.Logger().Info("Binding Pod Volumes skipped: all volumes already bound",
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes", zap.String("podName", pod.Name))
boundClaims, claimsToBind, unboundClaimsImmediate, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
log.Logger().Error("Failed to get pod volumes",
zap.String("podName", assumedPod.Name),
zap.Error(err))
return err
}
if len(unboundClaimsImmediate) > 0 {
err = fmt.Errorf("pod %s has unbound immediate claims", pod.Name)
log.Logger().Error("Pod has unbound immediate claims",
zap.String("podName", assumedPod.Name),
zap.Error(err))
return err
}
node, err := ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
log.Logger().Error("Failed to get node info",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Error(err))
return err
}
volumes, reasons, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node)
if err != nil {
log.Logger().Error("Failed to find pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("claimsToBind", len(claimsToBind)),
zap.Error(err))
return err
}
if len(reasons) > 0 {
sReasons := make([]string, 0)
for _, reason := range reasons {
sReasons = append(sReasons, string(reason))
}
sReason := strings.Join(sReasons, ", ")
err = fmt.Errorf("pod %s has conflicting volume claims: %s", pod.Name, sReason)
log.Logger().Error("Pod has conflicting volume claims",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("claimsToBind", len(claimsToBind)),
zap.Error(err))
return err
}
if volumes.StaticBindings == nil {
// convert nil to empty array
volumes.StaticBindings = make([]*scheduling.BindingInfo, 0)
}
if volumes.DynamicProvisions == nil {
// convert nil to empty array
volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0)
}
err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes)
if err != nil {
log.Logger().Error("Failed to bind pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
zap.Int("staticBindings", len(volumes.StaticBindings)))
return err
}
}
}
return nil
}
// assume a pod will be running on a node, in scheduler, we maintain
// a cache where stores info for each node what pods are supposed to
// be running on it. And we keep this cache in-sync between core and the shim.
// this way, the core can make allocation decisions with consideration of
// other assumed pods before they are actually bound to the node (bound is slow).
func (ctx *Context) AssumePod(name string, node string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod, ok := ctx.schedulerCache.GetPod(name); ok {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
// race when some other go-routines accessing it in parallel.
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
assumedPod := pod.DeepCopy()
// assume pod volumes, this will update bindings info in cache
// assume pod volumes before assuming the pod
// this will update scheduler cache with essential PV/PVC binding info
var allBound = true
// volume builder might be null in UTs
if ctx.apiProvider.GetAPIs().VolumeBinder != nil {
var err error
boundClaims, claimsToBind, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
return err
}
volumes, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(pod, boundClaims, claimsToBind, targetNode.Node())
if err != nil {
return err
}
allBound, err = ctx.apiProvider.GetAPIs().VolumeBinder.AssumePodVolumes(pod, node, volumes)
if err != nil {
return err
}
}
// assign the node name for pod
assumedPod.Spec.NodeName = node
return ctx.schedulerCache.AssumePod(assumedPod, allBound)
}
}
return nil
}
// forget pod must be called when a pod is assumed to be running on a node,
// but then for some reason it is failed to bind or released.
func (ctx *Context) ForgetPod(name string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod, ok := ctx.schedulerCache.GetPod(name); ok {
log.Logger().Debug("forget pod", zap.String("pod", pod.Name))
return ctx.schedulerCache.ForgetPod(pod)
}
log.Logger().Debug("unable to forget pod",
zap.String("reason", fmt.Sprintf("pod %s not found in scheduler cache", name)))
return nil
}
func (ctx *Context) UpdateApplication(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.applications[app.applicationID] = app
}
// inform the scheduler that the application is completed,
// the complete state may further explained to completed_with_errors(failed) or successfully_completed,
// either way we need to release all allocations (if exists) for this application
func (ctx *Context) NotifyApplicationComplete(appID string) {
if app := ctx.GetApplication(appID); app != nil {
log.Logger().Debug("NotifyApplicationComplete",
zap.String("appID", appID),
zap.String("currentAppState", app.GetApplicationState()))
ev := NewSimpleApplicationEvent(appID, events.CompleteApplication)
dispatcher.Dispatch(ev)
}
}
func (ctx *Context) NotifyApplicationFail(appID string) {
if app := ctx.GetApplication(appID); app != nil {
log.Logger().Debug("NotifyApplicationFail",
zap.String("appID", appID),
zap.String("currentAppState", app.GetApplicationState()))
ev := NewSimpleApplicationEvent(appID, events.FailApplication)
dispatcher.Dispatch(ev)
}
}
func (ctx *Context) NotifyTaskComplete(appID, taskID string) {
log.Logger().Debug("NotifyTaskComplete",
zap.String("appID", appID),
zap.String("taskID", taskID))
if app := ctx.GetApplication(appID); app != nil {
log.Logger().Debug("release allocation",
zap.String("appID", appID),
zap.String("taskID", taskID))
ev := NewSimpleTaskEvent(appID, taskID, events.CompleteTask)
dispatcher.Dispatch(ev)
appEv := NewSimpleApplicationEvent(appID, events.AppTaskCompleted)
dispatcher.Dispatch(appEv)
}
}
// update application tags in the AddApplicationRequest based on the namespace annotation
// adds the following tags to the request based on annotations (if exist):
// - namespace.resourcequota
// - namespace.parentqueue
func (ctx *Context) updateApplicationTags(request *interfaces.AddApplicationRequest, namespace string) {
namespaceObj := ctx.getNamespaceObject(namespace)
if namespaceObj == nil {
return
}
// add resource quota info as an app tag
resourceQuota := utils.GetNamespaceQuotaFromAnnotation(namespaceObj)
if resourceQuota != nil && !common.IsZero(resourceQuota) {
if quotaStr, err := json.Marshal(resourceQuota); err == nil {
request.Metadata.Tags[constants.AppTagNamespaceResourceQuota] = string(quotaStr)
}
}
// add parent queue info as an app tag
parentQueue := namespaceObj.Annotations["yunikorn.apache.org/parentqueue"]
if parentQueue != "" {
request.Metadata.Tags[constants.AppTagNamespaceParentQueue] = parentQueue
}
}
// returns the namespace object from the namespace's name
// if the namespace is unable to be listed from api-server, a nil is returned
func (ctx *Context) getNamespaceObject(namespace string) *v1.Namespace {
if namespace == "" {
log.Logger().Debug("could not get namespace from empty string")
return nil
}
nsLister := ctx.apiProvider.GetAPIs().NamespaceInformer.Lister()
namespaceObj, err := nsLister.Get(namespace)
if err != nil {
// every app should belong to a namespace,
// if we cannot list the namespace here, probably something is wrong
// log an error here and skip retrieving the resource quota
log.Logger().Error("failed to get app namespace", zap.Error(err))
return nil
}
return namespaceObj
}
func (ctx *Context) AddApplication(request *interfaces.AddApplicationRequest) interfaces.ManagedApp {
log.Logger().Debug("AddApplication", zap.Any("Request", request))
if app := ctx.GetApplication(request.Metadata.ApplicationID); app != nil {
return app
}
ctx.lock.Lock()
defer ctx.lock.Unlock()
if ns, ok := request.Metadata.Tags[constants.AppTagNamespace]; ok {
log.Logger().Debug("app namespace info",
zap.String("appID", request.Metadata.ApplicationID),
zap.String("namespace", ns))
ctx.updateApplicationTags(request, ns)
}
app := NewApplication(
request.Metadata.ApplicationID,
request.Metadata.QueueName,
request.Metadata.User,
request.Metadata.Tags,
ctx.apiProvider.GetAPIs().SchedulerAPI)
app.setTaskGroups(request.Metadata.TaskGroups)
if request.Metadata.SchedulingPolicyParameters != nil {
app.SetPlaceholderTimeout(request.Metadata.SchedulingPolicyParameters.GetPlaceholderTimeout())
app.setSchedulingStyle(request.Metadata.SchedulingPolicyParameters.GetGangSchedulingStyle())
}
app.setOwnReferences(request.Metadata.OwnerReferences)
// add into cache
ctx.applications[app.applicationID] = app
log.Logger().Info("app added",
zap.String("appID", app.applicationID))
return app
}
func (ctx *Context) GetApplication(appID string) interfaces.ManagedApp {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if app, ok := ctx.applications[appID]; ok {
return app
}
return nil
}
func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if app, exist := ctx.applications[appID]; exist {
//get the non-terminated task alias
nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
// check there are any non-terminated task or not
if len(nonTerminatedTaskAlias) > 0 {
return fmt.Errorf("failed to remove application %s because it still has task in non-terminated task, tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ","))
}
// send the update request to scheduler core
rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(&rr); err != nil {
log.Logger().Error("failed to send remove application request to core", zap.Error(err))
}
delete(ctx.applications, appID)
log.Logger().Info("app removed",
zap.String("appID", appID))
return nil
}
return fmt.Errorf("application %s is not found in the context", appID)
}
func (ctx *Context) RemoveApplicationInternal(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if _, exist := ctx.applications[appID]; exist {
delete(ctx.applications, appID)
return nil
}
return fmt.Errorf("application %s is not found in the context", appID)
}
// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *interfaces.AddTaskRequest) interfaces.ManagedTask {
log.Logger().Debug("AddTask",
zap.String("appID", request.Metadata.ApplicationID),
zap.String("taskID", request.Metadata.TaskID))
if managedApp := ctx.GetApplication(request.Metadata.ApplicationID); managedApp != nil {
if app, valid := managedApp.(*Application); valid {
existingTask, err := app.GetTask(request.Metadata.TaskID)
if err != nil {
task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata)
app.addTask(task)
log.Logger().Info("task added",
zap.String("appID", app.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskState", task.GetTaskState()))
return task
}
return existingTask
}
}
return nil
}
func (ctx *Context) RemoveTask(appID, taskID string) error {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if app, ok := ctx.applications[appID]; ok {
return app.removeTask(taskID)
}
return fmt.Errorf("application %s is not found in the context", appID)
}
func (ctx *Context) getTask(appID string, taskID string) (*Task, error) {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if app, ok := ctx.applications[appID]; ok {
if managedTask, err := app.GetTask(taskID); err == nil {
if task, valid := managedTask.(*Task); valid {
return task, nil
}
}
}
return nil, fmt.Errorf("application %s is not found in context", appID)
}
func (ctx *Context) SelectApplications(filter func(app *Application) bool) []*Application {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
apps := make([]*Application, 0)
for _, app := range ctx.applications {
if filter != nil && !filter(app) {
continue
}
apps = append(apps, app)
}
return apps
}
func (ctx *Context) PublishEvents(eventRecords []*si.EventRecord) {
if len(eventRecords) > 0 {
for _, record := range eventRecords {
switch record.Type {
case si.EventRecord_REQUEST:
taskID := record.ObjectID
appID := record.GroupID
if task, err := ctx.getTask(appID, taskID); err == nil {
events.GetRecorder().Event(task.GetTaskPod(),
v1.EventTypeNormal, record.Reason, record.Message)
} else {
log.Logger().Warn("task event is not published because task is not found",
zap.String("appID", appID),
zap.String("taskID", taskID),
zap.String("event", record.String()))
}
case si.EventRecord_NODE:
nodeID := record.ObjectID
nodeInfo := ctx.schedulerCache.GetNode(nodeID)
if nodeInfo == nil {
log.Logger().Warn("node event is not published because nodeInfo is not found",
zap.String("nodeID", nodeID),
zap.String("event", record.String()))
continue
}
node := nodeInfo.Node()
if node == nil {
log.Logger().Warn("node event is not published because node is not found",
zap.String("nodeID", nodeID),
zap.String("event", record.String()))
continue
}
events.GetRecorder().Event(node,
v1.EventTypeNormal, record.Reason, record.Message)
default:
log.Logger().Warn("Unsupported event type, currently only supports to publish request event records",
zap.String("type", record.Type.String()))
}
}
}
}
// update task's pod condition when the condition has not yet updated,
// return true if the update was done and false if the update is skipped due to any error, or a dup operation
func (ctx *Context) updatePodCondition(task *Task, podCondition *v1.PodCondition) bool {
if task.GetTaskState() == events.States().Task.Scheduling {
// only update the pod when pod condition changes
// minimize the overhead added to the api-server/etcd
if !utils.PodUnderCondition(task.pod, podCondition) {
log.Logger().Debug("updating pod condition",
zap.String("namespace", task.pod.Namespace),
zap.String("name", task.pod.Name),
zap.Any("podCondition", podCondition))
// call api-server to do the pod condition update
if podutil.UpdatePodCondition(&task.pod.Status, podCondition) {
if !ctx.apiProvider.IsTestingMode() {
_, err := ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().CoreV1().
Pods(task.pod.Namespace).UpdateStatus(context.Background(), task.pod, metav1.UpdateOptions{})
if err == nil {
return true
}
// only log the error here, no need to handle it if the update failed
log.Logger().Error("update pod condition failed",
zap.Error(err))
}
}
}
}
return false
}
// this function handles the pod scheduling failures with respect to the different causes,
// and update the pod condition accordingly. the cluster autoscaler depends on the certain
// pod condition in order to trigger auto-scaling.
func (ctx *Context) HandleContainerStateUpdate(request *si.UpdateContainerSchedulingStateRequest) {
// the allocationKey equals to the taskID
if task, err := ctx.getTask(request.ApplicartionID, request.AllocationKey); err == nil {
switch request.State {
case si.UpdateContainerSchedulingStateRequest_SKIPPED:
// auto-scaler scans pods whose pod condition is PodScheduled=false && reason=Unschedulable
// if the pod is skipped because the queue quota has been exceed, we do not trigger the auto-scaling
if ctx.updatePodCondition(task,
&v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulingSkipped",
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod,
v1.EventTypeNormal, "PodUnschedulable",
"Task %s is skipped from scheduling because the queue quota has been exceed", task.alias)
}
case si.UpdateContainerSchedulingStateRequest_FAILED:
// set pod condition to Unschedulable in order to trigger auto-scaling
if ctx.updatePodCondition(task,
&v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod,
v1.EventTypeNormal, "PodUnschedulable",
"Task %s is pending for the requested resources become available", task.alias)
}
default:
log.Logger().Warn("no handler for container scheduling state",
zap.String("state", request.State.String()))
}
}
}
func (ctx *Context) ApplicationEventHandler() func(obj interface{}) {
return func(obj interface{}) {
if event, ok := obj.(events.ApplicationEvent); ok {
managedApp := ctx.GetApplication(event.GetApplicationID())
if managedApp == nil {
log.Logger().Error("failed to handle application event",
zap.String("reason", "application not exist"))
return
}
if app, ok := managedApp.(*Application); ok {
if app.canHandle(event) {
if err := app.handle(event); err != nil {
log.Logger().Error("failed to handle application event",
zap.String("event", string(event.GetEvent())),
zap.Error(err))
}
}
}
}
}
}
func (ctx *Context) TaskEventHandler() func(obj interface{}) {
return func(obj interface{}) {
if event, ok := obj.(events.TaskEvent); ok {
task, err := ctx.getTask(event.GetApplicationID(), event.GetTaskID())
if err != nil {
log.Logger().Error("failed to handle application event", zap.Error(err))
return
}
if task.canHandle(event) {
if err = task.handle(event); err != nil {
log.Logger().Error("failed to handle task event",
zap.String("applicationID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("event", string(event.GetEvent())),
zap.Error(err))
}
}
}
}
}
func (ctx *Context) SchedulerNodeEventHandler() func(obj interface{}) {
if ctx != nil && ctx.nodes != nil {
return ctx.nodes.schedulerNodeEventHandler()
}
// this is not required in some tests
return nil
}
func findYKConfigMap(configMaps []*v1.ConfigMap) (*v1.ConfigMap, error) {
if len(configMaps) == 0 {
return nil, fmt.Errorf("configmap with label app:yunikorn not found")
}
for _, c := range configMaps {
if c.Name == constants.DefaultConfigMapName {
return c, nil
}
}
return nil, fmt.Errorf("configmap with name %s not found", constants.DefaultConfigMapName)
}
/*
Save the configmap and returns the old one and an error if the process failed
*/
func (ctx *Context) SaveConfigmap(request *si.UpdateConfigurationRequest) *si.UpdateConfigurationResponse {
// if hot-refresh is enabled, configMap change through the API is not allowed
if ctx.apiProvider.GetAPIs().Conf.EnableConfigHotRefresh {
return &si.UpdateConfigurationResponse{
Success: false,
Reason: fmt.Sprintf("hot-refresh is enabled. To use the API for configuration update, " +
"set enableConfigHotRefresh = false and restart the scheduler"),
}
}
slt := labels.SelectorFromSet(labels.Set{constants.LabelApp: "yunikorn"})
configMaps, err := ctx.apiProvider.GetAPIs().ConfigMapInformer.Lister().List(slt)
if err != nil {
return &si.UpdateConfigurationResponse{
Success: false,
Reason: err.Error(),
}
}
ykconf, err := findYKConfigMap(configMaps)
if err != nil {
return &si.UpdateConfigurationResponse{
Success: false,
Reason: err.Error(),
}
}
newConfData := map[string]string{"queues.yaml": strings.ReplaceAll(request.Configs, "\r\n", "\n")}
newConf := ykconf.DeepCopy()
oldConfData := ykconf.Data["queues.yaml"]
newConf.Data = newConfData
_, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().CoreV1().ConfigMaps(ykconf.Namespace).Update(context.Background(), newConf, metav1.UpdateOptions{})
if err != nil {
return &si.UpdateConfigurationResponse{
Success: false,
Reason: err.Error(),
}
}
log.Logger().Info("ConfigMap updated successfully")
return &si.UpdateConfigurationResponse{
Success: true,
OldConfig: oldConfData,
}
}